Source code for spyne.protocol.json

#
# spyne - Copyright (C) Spyne contributors.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
#

"""The ``spyne.protocol.json`` package contains the Json-related protocols.
Currently, only :class:`spyne.protocol.json.JsonDocument` is supported.

Initially released in 2.8.0-rc.

This module is EXPERIMENTAL. You may not recognize the code here next time you
look at it.

Missing Types
=============

The JSON standard does not define every type that Spyne supports. These include
Date/Time types as well as arbitrary-length integers and arbitrary-precision
decimals. Integers are parsed to ``int``\s or ``long``\s seamlessly but
``Decimal``\s are only parsed correctly when they come off as strings.

While it's possible to e.g. (de)serialize floats to ``Decimal``\s by adding
hooks to ``parse_float`` [#]_ (and convert later as necessary), such
customizations apply to the whole incoming document which pretty much messes up
``AnyDict`` serialization and deserialization.

It also wasn't possible to work with ``object_pairs_hook`` as Spyne's parsing
is always "from outside to inside" whereas ``object_pairs_hook`` is passed
``dict``\s basically in any order "from inside to outside".

.. [#] http://docs.python.org/2/library/json.html#json.loads
"""

from __future__ import absolute_import

import logging
logger = logging.getLogger(__name__)

from spyne.util import six

from itertools import chain

try:
    import simplejson as json
    from simplejson.decoder import JSONDecodeError
except ImportError:
    import json
    JSONDecodeError = ValueError

from spyne.error import ValidationError
from spyne.error import ResourceNotFoundError

from spyne.model.binary import BINARY_ENCODING_BASE64
from spyne.model.primitive import Date
from spyne.model.primitive import Time
from spyne.model.primitive import DateTime
from spyne.model.primitive import Double
from spyne.model.primitive import Integer
from spyne.model.primitive import Boolean
from spyne.model.fault import Fault
from spyne.protocol.dictdoc import HierDictDocument


class JsonEncoder(json.JSONEncoder):
    def default(self, o):
        try:
            return super(JsonEncoder, self).default(o)

        except TypeError as e:
            # if json can't serialize it, it's possibly a generator. If not,
            # additional hacks are welcome :)
            if logger.level == logging.DEBUG:
                logger.exception(e)
            return list(o)


class JsonDocument(HierDictDocument):
[docs] """An implementation of the json protocol that uses simplejson package when available, json package otherwise. :param ignore_wrappers: Does not serialize wrapper objects. :param complex_as: One of (list, dict). When list, the complex objects are serialized to a list of values instead of a dict of key/value pairs. """ mime_type = 'application/json' type = set(HierDictDocument.type) type.add('json') default_binary_encoding = BINARY_ENCODING_BASE64 # flags used just for tests _decimal_as_string = True def __init__(self, app=None, validator=None, mime_type=None, ignore_uncap=False, # DictDocument specific ignore_wrappers=True, complex_as=dict, ordered=False, default_string_encoding=None, **kwargs): super(JsonDocument, self).__init__(app, validator, mime_type, ignore_uncap, ignore_wrappers, complex_as, ordered) # this is needed when we're overriding a regular instance attribute # with a property. self.__message = HierDictDocument.__getattribute__(self, 'message') self._from_string_handlers[Double] = self._ret self._from_string_handlers[Boolean] = self._ret self._from_string_handlers[Integer] = self._ret self._to_string_handlers[Double] = self._ret self._to_string_handlers[Boolean] = self._ret self._to_string_handlers[Integer] = self._ret self.default_string_encoding = default_string_encoding self.kwargs = kwargs def _ret(self, cls, value): return value def validate(self, key, cls, val): super(JsonDocument, self).validate(key, cls, val) if issubclass(cls, (DateTime, Date, Time)) and not ( isinstance(val, six.string_types) and cls.validate_string(cls, val)): raise ValidationError(key, val) @property def message(self): return self.__message @message.setter def message(self, val): if val is self.RESPONSE and not ('cls' in self.kwargs): self.kwargs['cls'] = JsonEncoder self.__message = val def create_in_document(self, ctx, in_string_encoding=None):
[docs] """Sets ``ctx.in_document`` using ``ctx.in_string``.""" try: in_string = ''.join(ctx.in_string) if not isinstance(in_string, six.text_type): if in_string_encoding is None: in_string_encoding = self.default_string_encoding if in_string_encoding is not None: in_string = in_string.decode(in_string_encoding) ctx.in_document = json.loads(in_string, **self.kwargs) except JSONDecodeError as e: raise Fault('Client.JsonDecodeError', repr(e)) def create_out_string(self, ctx, out_string_encoding='utf8'):
[docs] """Sets ``ctx.out_string`` using ``ctx.out_document``.""" ctx.out_string = (json.dumps(o, **self.kwargs) for o in ctx.out_document) class JsonP(JsonDocument):
[docs] """The JsonP protocol puts the reponse document inside a designated javascript function call. The input protocol is identical to the JsonDocument protocol. :param callback_name: The name of the function call that will wrapp all response documents. For other arguents, see :class:`spyne.protocol.json.JsonDocument`. """ type = set(HierDictDocument.type) type.add('jsonp') def __init__(self, callback_name, *args, **kwargs): super(JsonP, self).__init__(*args, **kwargs) self.callback_name = callback_name def create_out_string(self, ctx): super(JsonP, self).create_out_string(ctx) ctx.out_string = chain( [self.callback_name, '('], ctx.out_string, [');'], ) class _SpyneJsonRpc1(JsonDocument):
version = 1 VERSION = 'ver' BODY = 'body' HEAD = 'head' FAULT = 'fault' def decompose_incoming_envelope(self, ctx, message=JsonDocument.REQUEST): indoc = ctx.in_document if not isinstance(indoc, dict): raise ValidationError("Invalid Request") ver = indoc.get(self.VERSION) if ver is None: raise ValidationError("Missing Version") body = indoc.get(self.BODY) err = indoc.get(self.FAULT) if body is None and err is None: raise ValidationError("Missing request") ctx.protocol.error = False if err is not None: ctx.in_body_doc = err ctx.protocol.error = True else: if not isinstance(body, dict): raise ValidationError("Missing request body") if not len(body) == 1: raise ValidationError("Need len(body) == 1") ctx.in_header_doc = indoc.get(self.HEAD) if not isinstance(ctx.in_header_doc, list): ctx.in_header_doc = [ctx.in_header_doc] (ctx.method_request_string,ctx.in_body_doc), = body.items() def deserialize(self, ctx, message): assert message in (self.REQUEST, self.RESPONSE) self.event_manager.fire_event('before_deserialize', ctx) if ctx.descriptor is None: raise ResourceNotFoundError(ctx.method_request_string) if ctx.protocol.error: ctx.in_object = None ctx.in_error = self._doc_to_object(Fault, ctx.in_body_doc) else: if message is self.REQUEST: header_class = ctx.descriptor.in_header body_class = ctx.descriptor.in_message elif message is self.RESPONSE: header_class = ctx.descriptor.out_header body_class = ctx.descriptor.out_message # decode header objects if (ctx.in_header_doc is not None and header_class is not None): headers = [None] * len(header_class) for i, (header_doc, head_class) in enumerate( zip(ctx.in_header_doc, header_class)): if header_doc is not None and i < len(header_doc): headers[i] = self._doc_to_object(head_class, header_doc) if len(headers) == 1: ctx.in_header = headers[0] else: ctx.in_header = headers # decode method arguments if ctx.in_body_doc is None: ctx.in_object = [None] * len(body_class._type_info) else: ctx.in_object = self._doc_to_object(body_class, ctx.in_body_doc) self.event_manager.fire_event('after_deserialize', ctx) def serialize(self, ctx, message): assert message in (self.REQUEST, self.RESPONSE) self.event_manager.fire_event('before_serialize', ctx) # construct the soap response, and serialize it nsmap = self.app.interface.nsmap ctx.out_document = { "ver": self.version, } if ctx.out_error is not None: ctx.out_document[self.FAULT] = Fault.to_dict(Fault, ctx.out_error) else: if message is self.REQUEST: header_message_class = ctx.descriptor.in_header body_message_class = ctx.descriptor.in_message elif message is self.RESPONSE: header_message_class = ctx.descriptor.out_header body_message_class = ctx.descriptor.out_message # assign raw result to its wrapper, result_message out_type_info = body_message_class._type_info out_object = body_message_class() keys = iter(out_type_info) values = iter(ctx.out_object) while True: try: k = next(keys) except StopIteration: break try: v = next(values) except StopIteration: v = None setattr(out_object, k, v) ctx.out_document[self.BODY] = ctx.out_body_doc = \ self._object_to_doc(body_message_class, out_object) # header if ctx.out_header is not None and header_message_class is not None: if isinstance(ctx.out_header, (list, tuple)): out_headers = ctx.out_header else: out_headers = (ctx.out_header,) ctx.out_header_doc = out_header_doc = [] for header_class, out_header in zip(header_message_class, out_headers): out_header_doc.append(self._object_to_doc(header_class, out_header)) if len(out_header_doc) > 1: ctx.out_document[self.HEAD] = out_header_doc else: ctx.out_document[self.HEAD] = out_header_doc[0] self.event_manager.fire_event('after_serialize', ctx) _json_rpc_flavours = { 'spyne': _SpyneJsonRpc1 } def JsonRpc(flavour, *args, **kwargs): assert flavour in _json_rpc_flavours, "Unknown JsonRpc flavour. " \ "Accepted ones are: %r" % tuple(_json_rpc_flavours) return _json_rpc_flavours[flavour](*args, **kwargs)