Source code for spyne.protocol.soap.soap11

#
# spyne - Copyright (C) Spyne contributors.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
#

"""The ``spyne.protoco.soap.soap11`` module contains the implementation of a
subset of the Soap 1.1 standard.

Except the binary optimizations that mostly **do not work**, this protocol is
production quality.

Initially released in soaplib-0.8.0.
"""

import logging
logger = logging.getLogger(__name__)

import cgi

import spyne.const.xml_ns as ns

from lxml import etree
from lxml.etree import XMLSyntaxError

from spyne.const.http import HTTP_405
from spyne.const.http import HTTP_500
from spyne.const.ansi_color import LIGHT_GREEN
from spyne.const.ansi_color import LIGHT_RED
from spyne.const.ansi_color import END_COLOR
from spyne.error import RequestNotAllowed
from spyne.model.fault import Fault
from spyne.model.primitive import DateTime
from spyne.protocol.xml import XmlDocument
from spyne.protocol.xml.model import nillable_value
from spyne.protocol.xml.model import TBaseFromElement
from spyne.protocol.soap.mime import collapse_swa

def _from_soap(in_envelope_xml, xmlids=None):
    '''Parses the xml string into the header and payload.'''

    if xmlids:
        resolve_hrefs(in_envelope_xml, xmlids)

    if in_envelope_xml.tag != '{%s}Envelope' % ns.soap_env:
        raise Fault('Client.SoapError', 'No {%s}Envelope element was found!' %
                                                            ns.soap_env)

    header_envelope = in_envelope_xml.xpath('e:Header',
                                          namespaces={'e': ns.soap_env})
    body_envelope = in_envelope_xml.xpath('e:Body',
                                          namespaces={'e': ns.soap_env})

    if len(header_envelope) == 0 and len(body_envelope) == 0:
        raise Fault('Client.SoapError', 'Soap envelope is empty!' %
                                                            ns.soap_env)

    header=None
    if len(header_envelope) > 0:
        header = header_envelope[0].getchildren()

    body=None
    if len(body_envelope) > 0 and len(body_envelope[0]) > 0:
        body = body_envelope[0][0]

    return header, body

def _parse_xml_string(xml_string, charset=None,
                                  parser=etree.XMLParser(remove_comments=True)):
    if charset:
        string = ''.join([s.decode(charset) for s in xml_string])
    else:
        string = ''.join(xml_string)

    try:
        try:
            root, xmlids = etree.XMLID(string, parser)

        except XMLSyntaxError, e:
            logger.error(string)
            raise Fault('Client.XMLSyntaxError', str(e))

    except ValueError, e:
        logger.debug('%r -- falling back to str decoding.' % (e))
        try:
            root, xmlids = etree.XMLID(string.encode(charset), parser)
        except XMLSyntaxError, e:
            logger.error(string)
            raise Fault('Client.XMLSyntaxError', str(e))

    return root, xmlids

# see http://www.w3.org/TR/2000/NOTE-SOAP-20000508/
# section 5.2.1 for an example of how the id and href attributes are used.
def resolve_hrefs(element, xmlids):
[docs] for e in element: if e.get('id'): continue # don't need to resolve this element elif e.get('href'): resolved_element = xmlids[e.get('href').replace('#', '')] if resolved_element is None: continue resolve_hrefs(resolved_element, xmlids) # copies the attributes [e.set(k, v) for k, v in resolved_element.items()] # copies the children [e.append(child) for child in resolved_element.getchildren()] # copies the text e.text = resolved_element.text else: resolve_hrefs(e, xmlids) return element @nillable_value
def _datetime_to_parent_element(prot, cls, value, tns, parent_elt, name='retval'): e = etree.SubElement(parent_elt, '{%s}%s' % (tns, name)) e.text = value.isoformat() _datetime_from_element = TBaseFromElement(lambda cls, s: cls.default_parse(s)) class Soap11(XmlDocument):
[docs] """The base implementation of a subset of the Soap 1.1 standard. The document is available here: http://www.w3.org/TR/soap11/ :param app: A spyne.application.Application instance. :param validator: The validator to use. Currently the only supported value is 'lxml' :param wrapped: Whether the return type should be wrapped in another object. Default is 'True'. :param xml_declaration: Whether to add xml_declaration to the responses Default is 'True'. :param cleanup_namespaces: Whether to add clean up namespace declarations in the response document. Default is 'False'. """ allowed_http_verbs = ['POST'] mime_type = 'text/xml; charset=utf-8' def __init__(self, app=None, validator=None, wrapped=True, xml_declaration=True, cleanup_namespaces=False): XmlDocument.__init__(self, app, validator, xml_declaration, cleanup_namespaces) self.__wrapped = wrapped # SOAP requires DateTime strings to be in iso format. This function # bypasses datetime formatting via DateTime(format="...") string. self.serialization_handlers[DateTime] = _datetime_to_parent_element self.deserialization_handlers[DateTime] = _datetime_from_element @property def wrapped(self):
[docs] return self.__wrapped def create_in_document(self, ctx, charset=None):
[docs] if ctx.transport.type == 'wsgi': # according to the soap via http standard, soap requests must only # work with proper POST requests. content_type = ctx.transport.req_env.get("CONTENT_TYPE") http_verb = ctx.transport.req_env['REQUEST_METHOD'].upper() if content_type is None or http_verb != "POST": ctx.transport.resp_code = HTTP_405 raise RequestNotAllowed( "You must issue a POST request with the Content-Type " "header properly set.") content_type = cgi.parse_header(content_type) collapse_swa(content_type, ctx.in_string) ctx.in_document = _parse_xml_string(ctx.in_string, charset) def decompose_incoming_envelope(self, ctx, message=XmlDocument.REQUEST):
[docs] envelope_xml, xmlids = ctx.in_document header_document, body_document = _from_soap(envelope_xml, xmlids) ctx.in_document = envelope_xml if body_document.tag == '{%s}Fault' % ns.soap_env: ctx.in_body_doc = body_document else: ctx.in_header_doc = header_document ctx.in_body_doc = body_document ctx.method_request_string = ctx.in_body_doc.tag self.validate_body(ctx, message) def deserialize(self, ctx, message):
[docs] """Takes a MethodContext instance and a string containing ONE soap message. Returns the corresponding native python object Not meant to be overridden. """ assert message in (self.REQUEST, self.RESPONSE) self.event_manager.fire_event('before_deserialize', ctx) if ctx.in_body_doc.tag == "{%s}Fault" % ns.soap_env: ctx.in_object = None ctx.in_error = self.from_element(Fault, ctx.in_body_doc) else: if message is self.REQUEST: header_class = ctx.descriptor.in_header body_class = ctx.descriptor.in_message elif message is self.RESPONSE: header_class = ctx.descriptor.out_header body_class = ctx.descriptor.out_message # decode header objects if (ctx.in_header_doc is not None and header_class is not None): if isinstance(header_class, (list, tuple)): headers = [None] * len(header_class) for i, (header_doc, head_class) in enumerate( zip(ctx.in_header_doc, header_class)): if len(header_doc) > i: headers[i] = self.from_element(head_class, header_doc) ctx.in_header = tuple(headers) else: if len(ctx.in_header_doc) > 0: ctx.in_header = self.from_element(header_class, ctx.in_header_doc[0]) # decode method arguments if ctx.in_body_doc is None: ctx.in_object = [None] * len(body_class._type_info) else: ctx.in_object = self.from_element(body_class, ctx.in_body_doc) self.event_manager.fire_event('after_deserialize', ctx) def serialize(self, ctx, message):
[docs] """Uses ctx.out_object, ctx.out_header or ctx.out_error to set ctx.out_body_doc, ctx.out_header_doc and ctx.out_document as an lxml.etree._Element instance. Not meant to be overridden. """ assert message in (self.REQUEST, self.RESPONSE) self.event_manager.fire_event('before_serialize', ctx) # construct the soap response, and serialize it nsmap = self.app.interface.nsmap ctx.out_document = etree.Element('{%s}Envelope' % ns.soap_env, nsmap=nsmap) if ctx.out_error is not None: # FIXME: There's no way to alter soap response headers for the user. ctx.out_body_doc = out_body_doc = etree.SubElement(ctx.out_document, '{%s}Body' % ns.soap_env, nsmap=nsmap) self.to_parent_element(ctx.out_error.__class__, ctx.out_error, self.app.interface.get_tns(), out_body_doc) else: if message is self.REQUEST: header_message_class = ctx.descriptor.in_header body_message_class = ctx.descriptor.in_message elif message is self.RESPONSE: header_message_class = ctx.descriptor.out_header body_message_class = ctx.descriptor.out_message # body ctx.out_body_doc = out_body_doc = etree.Element( '{%s}Body' % ns.soap_env) # assign raw result to its wrapper, result_message out_type_info = body_message_class._type_info out_object = body_message_class() keys = iter(out_type_info) values = iter(ctx.out_object) while True: try: k = keys.next() except StopIteration: break try: v = values.next() except StopIteration: v = None setattr(out_object, k, v) # transform the results into an element self.to_parent_element(body_message_class, out_object, body_message_class.get_namespace(), out_body_doc) # header if ctx.out_header is not None: if ctx.descriptor.out_header is None: logger.warning( "Skipping soap response header as %r method is not " "declared to have one." % ctx.method_name) else: ctx.out_header_doc = soap_header_elt = etree.SubElement( ctx.out_document, '{%s}Header' % ns.soap_env) if isinstance(header_message_class, (list, tuple)): if isinstance(ctx.out_header, (list, tuple)): out_headers = ctx.out_header else: out_headers = (ctx.out_header,) for header_class, out_header in zip(header_message_class, out_headers): self.to_parent_element(header_class, out_header, header_class.get_namespace(), soap_header_elt, header_class.get_type_name(), ) else: self.to_parent_element(header_message_class, ctx.out_header, header_message_class.get_namespace(), soap_header_elt, header_message_class.get_type_name() ) ctx.out_document.append(ctx.out_body_doc) if self.cleanup_namespaces: etree.cleanup_namespaces(ctx.out_document) if self.log_messages: if message is self.REQUEST: line_header = '%sRequest%s' % (LIGHT_GREEN, END_COLOR) elif message is self.RESPONSE: line_header = '%sResponse%s' % (LIGHT_RED, END_COLOR) logger.debug('%s %s' % (line_header, etree.tostring(ctx.out_document, xml_declaration=True, pretty_print=True))) self.event_manager.fire_event('after_serialize', ctx) def fault_to_http_response_code(self, fault):
[docs] return HTTP_500