Source code for spyne.protocol.soap.soap11

#
# spyne - Copyright (C) Spyne contributors.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
#

"""The ``spyne.protoco.soap.soap11`` module contains the implementation of a
subset of the Soap 1.1 standard.

Except the binary optimizations (MtoM, attachments, etc) that mostly
**do not work**, this protocol is production quality.

One must specifically enable the debug output for the Xml protocol to see the
actual document exchange. That's because the xml formatting code is run only
when explicitly enabled due to performance reasons. ::

    logging.getLogger('spyne.protocol.xml').setLevel(logging.DEBUG)

Initially released in soaplib-0.8.0.

Logs valid documents to %r and invalid documents to %r.
""" % (__name__, __name__ + ".invalid")

import logging
logger = logging.getLogger(__name__)
logger_invalid = logging.getLogger(__name__ + ".invalid")

import cgi

import spyne.const.xml_ns as ns

from lxml import etree
from lxml.etree import XMLSyntaxError
from lxml.etree import XMLParser

from spyne import BODY_STYLE_WRAPPED

from spyne.const.xml_ns import DEFAULT_NS
from spyne.const.http import HTTP_405
from spyne.const.http import HTTP_500
from spyne.error import RequestNotAllowed
from spyne.model.fault import Fault
from spyne.model.primitive import Date
from spyne.model.primitive import Time
from spyne.model.primitive import DateTime
from spyne.protocol.xml import XmlDocument
from spyne.protocol.soap.mime import collapse_swa


def _from_soap(in_envelope_xml, xmlids=None):
    """Parses the xml string into the header and payload.
    """

    if xmlids:
        resolve_hrefs(in_envelope_xml, xmlids)

    if in_envelope_xml.tag != '{%s}Envelope' % ns.soap_env:
        raise Fault('Client.SoapError', 'No {%s}Envelope element was found!' %
                                                            ns.soap_env)

    header_envelope = in_envelope_xml.xpath('e:Header',
                                          namespaces={'e': ns.soap_env})
    body_envelope = in_envelope_xml.xpath('e:Body',
                                          namespaces={'e': ns.soap_env})

    if len(header_envelope) == 0 and len(body_envelope) == 0:
        raise Fault('Client.SoapError', 'Soap envelope is empty!')

    header = None
    if len(header_envelope) > 0:
        header = header_envelope[0].getchildren()

    body = None
    if len(body_envelope) > 0 and len(body_envelope[0]) > 0:
        body = body_envelope[0][0]

    return header, body


def _parse_xml_string(xml_string, parser, charset=None):
    string = b''.join(xml_string)
    if charset:
        string = string.decode(charset)

    try:
        try:
            root, xmlids = etree.XMLID(string, parser)

        except ValueError as e:
            logger.debug('ValueError: Deserializing from unicode strings with '
                         'encoding declaration is not supported by lxml.')
            root, xmlids = etree.XMLID(string.encode(charset), parser)

    except XMLSyntaxError as e:
        logger_invalid.error(string)
        raise Fault('Client.XMLSyntaxError', str(e))

    return root, xmlids


# see http://www.w3.org/TR/2000/NOTE-SOAP-20000508/
# section 5.2.1 for an example of how the id and href attributes are used.
def resolve_hrefs(element, xmlids):
[docs] for e in element: if e.get('id'): continue # don't need to resolve this element elif e.get('href'): resolved_element = xmlids[e.get('href').replace('#', '')] if resolved_element is None: continue resolve_hrefs(resolved_element, xmlids) # copies the attributes [e.set(k, v) for k, v in resolved_element.items()] # copies the children [e.append(child) for child in resolved_element.getchildren()] # copies the text e.text = resolved_element.text else: resolve_hrefs(e, xmlids) return element class Soap11(XmlDocument):
[docs] """The base implementation of a subset of the Soap 1.1 standard. The document is available here: http://www.w3.org/TR/soap11/ :param app: The owner application instance. :param validator: One of (None, 'soft', 'lxml', 'schema', ProtocolBase.SOFT_VALIDATION, XmlDocument.SCHEMA_VALIDATION). Both ``'lxml'`` and ``'schema'`` values are equivalent to ``XmlDocument.SCHEMA_VALIDATION``. :param xml_declaration: Whether to add xml_declaration to the responses Default is 'True'. :param cleanup_namespaces: Whether to add clean up namespace declarations in the response document. Default is 'True'. :param encoding: The suggested string encoding for the returned xml documents. The transport can override this. :param pretty_print: When ``True``, returns the document in a pretty-printed format. """ mime_type = 'text/xml; charset=utf-8' type = set(XmlDocument.type) type.update(('soap', 'soap11')) def __init__(self, app=None, validator=None, xml_declaration=True, cleanup_namespaces=True, encoding='UTF-8', pretty_print=False): super(Soap11, self).__init__(app, validator, xml_declaration, cleanup_namespaces, encoding, pretty_print) # SOAP requires DateTime strings to be in iso format. The following # lines make sure custom datetime formatting via DateTime(format="...") # string is bypassed. self._to_string_handlers[Time] = lambda cls, value: value.isoformat() self._to_string_handlers[DateTime] = lambda cls, value: value.isoformat() self._from_string_handlers[Date] = self.date_from_string_iso self._from_string_handlers[DateTime] = self.datetime_from_string_iso def create_in_document(self, ctx, charset=None):
[docs] if ctx.transport.type == 'wsgi': # according to the soap via http standard, soap requests must only # work with proper POST requests. content_type = ctx.transport.req_env.get("CONTENT_TYPE") http_verb = ctx.transport.req_env['REQUEST_METHOD'].upper() if content_type is None or http_verb != "POST": ctx.transport.resp_code = HTTP_405 raise RequestNotAllowed( "You must issue a POST request with the Content-Type " "header properly set.") content_type = cgi.parse_header(content_type) collapse_swa(content_type, ctx.in_string) ctx.in_document = _parse_xml_string(ctx.in_string, XMLParser(**self.parser_kwargs), charset) def decompose_incoming_envelope(self, ctx, message=XmlDocument.REQUEST):
[docs] envelope_xml, xmlids = ctx.in_document header_document, body_document = _from_soap(envelope_xml, xmlids) ctx.in_document = envelope_xml if body_document.tag == '{%s}Fault' % ns.soap_env: ctx.in_body_doc = body_document else: ctx.in_header_doc = header_document ctx.in_body_doc = body_document ctx.method_request_string = ctx.in_body_doc.tag self.validate_body(ctx, message) def deserialize(self, ctx, message):
[docs] """Takes a MethodContext instance and a string containing ONE soap message. Returns the corresponding native python object Not meant to be overridden. """ assert message in (self.REQUEST, self.RESPONSE) self.event_manager.fire_event('before_deserialize', ctx) if ctx.in_body_doc.tag == "{%s}Fault" % ns.soap_env: ctx.in_object = None ctx.in_error = self.from_element(ctx, Fault, ctx.in_body_doc) else: if message is self.REQUEST: header_class = ctx.descriptor.in_header body_class = ctx.descriptor.in_message elif message is self.RESPONSE: header_class = ctx.descriptor.out_header body_class = ctx.descriptor.out_message # decode header objects if (ctx.in_header_doc is not None and header_class is not None): headers = [None] * len(header_class) for i, (header_doc, head_class) in enumerate( zip(ctx.in_header_doc, header_class)): if i < len(header_class): headers[i] = self.from_element(ctx, head_class, header_doc) if len(headers) == 1: ctx.in_header = headers[0] else: ctx.in_header = headers # decode method arguments if ctx.in_body_doc is None: ctx.in_object = [None] * len(body_class._type_info) else: ctx.in_object = self.from_element(ctx, body_class, ctx.in_body_doc) self.event_manager.fire_event('after_deserialize', ctx) def serialize(self, ctx, message):
[docs] """Uses ctx.out_object, ctx.out_header or ctx.out_error to set ctx.out_body_doc, ctx.out_header_doc and ctx.out_document as an lxml.etree._Element instance. Not meant to be overridden. """ assert message in (self.REQUEST, self.RESPONSE) self.event_manager.fire_event('before_serialize', ctx) # construct the soap response, and serialize it nsmap = self.app.interface.nsmap ctx.out_document = etree.Element('{%s}Envelope' % ns.soap_env, nsmap=nsmap) if ctx.out_error is not None: # FIXME: There's no way to alter soap response headers for the user. ctx.out_body_doc = out_body_doc = etree.SubElement(ctx.out_document, '{%s}Body' % ns.soap_env, nsmap=nsmap) self.to_parent(ctx, ctx.out_error.__class__, ctx.out_error, out_body_doc, self.app.interface.get_tns()) else: if message is self.REQUEST: header_message_class = ctx.descriptor.in_header body_message_class = ctx.descriptor.in_message elif message is self.RESPONSE: header_message_class = ctx.descriptor.out_header body_message_class = ctx.descriptor.out_message # body ctx.out_body_doc = out_body_doc = etree.Element( '{%s}Body' % ns.soap_env) # assign raw result to its wrapper, result_message if ctx.descriptor.body_style is BODY_STYLE_WRAPPED: out_type_info = body_message_class._type_info out_object = body_message_class() keys = iter(out_type_info) values = iter(ctx.out_object) while True: try: k = next(keys) except StopIteration: break try: v = next(values) except StopIteration: v = None setattr(out_object, k, v) self.to_parent(ctx, body_message_class, out_object, out_body_doc, body_message_class.get_namespace()) else: out_object = ctx.out_object[0] sub_ns = body_message_class.Attributes.sub_ns if sub_ns is None: sub_ns = body_message_class.get_namespace() if sub_ns is DEFAULT_NS: sub_ns = self.app.interface.get_tns() sub_name = body_message_class.Attributes.sub_name if sub_name is None: sub_name = body_message_class.get_type_name() self.to_parent(ctx, body_message_class, out_object, out_body_doc, sub_ns, sub_name) # header if ctx.out_header is not None and header_message_class is not None: ctx.out_header_doc = soap_header_elt = etree.SubElement( ctx.out_document, '{%s}Header' % ns.soap_env) if isinstance(ctx.out_header, (list, tuple)): out_headers = ctx.out_header else: out_headers = (ctx.out_header,) for header_class, out_header in zip(header_message_class, out_headers): self.to_parent(ctx, header_class, out_header, soap_header_elt, header_class.get_namespace(), header_class.get_type_name(), ) ctx.out_document.append(ctx.out_body_doc) if self.cleanup_namespaces: etree.cleanup_namespaces(ctx.out_document) self.event_manager.fire_event('after_serialize', ctx) def fault_to_http_response_code(self, fault):
[docs] return HTTP_500