#
# spyne - Copyright (C) Spyne contributors.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
#
"""The ``spyne.model.complex`` module contains
:class:`spyne.model.complex.ComplexBase` class and its helper objects and
subclasses. These are mainly container classes.
"""
import logging
logger = logging.getLogger(__name__)
import decimal
from spyne.model import ModelBase
from spyne.model import nillable_dict
from spyne.model import nillable_string
from spyne.model.primitive import NATIVE_MAP
from spyne.model.primitive import Unicode
from spyne.model.primitive import Point
from spyne.const import xml_ns as namespace
from spyne.const import MAX_STRING_FIELD_LENGTH
from spyne.const import MAX_ARRAY_ELEMENT_NUM
from spyne.const.suffix import ARRAY_SUFFIX
from spyne.const.suffix import TYPE_SUFFIX
from spyne.util import sanitize_args
from spyne.util.odict import odict
class xml:
[docs] """Complex argument to ``ComplexModelBase.Attributes.store_as`` for xml
serialization.
:param root_tag: Root tag of the xml element that contains the field values.
:param no_ns: When true, the xml document is stripped from namespace
information. use with caution.
"""
def __init__(self, root_tag=None, no_ns=False):
self.root_tag = root_tag
self.no_ns = no_ns
class table:
[docs] """Complex argument to ``ComplexModelBase.Attributes.store_as`` for storing
the class instance as a row in a table in a relational database.
:param multi: When False, configures a one-to-many relationship where the
child table has a foreign key to the parent. When != False, configures a
many-to-many relationship by creating an intermediate relation table
that has foreign keys to both parent and child classes and generates
a table name automatically. When True, the table name is generated
automatically. Otherwise, it should be a string, as the value is used as
the name of the intermediate table.
:param left: Name of the left join column.
:param right: Name of the right join column.
"""
def __init__(self, multi=False, left=None, right=None):
self.multi = multi
self.left = left
self.right = right
class json:
[docs] """Complex argument to ``ComplexModelBase.Attributes.store_as`` for storing
the class instance as a json document.
Make sure you don't mix this with the json package when importing.
"""
def __init__(self, skip_depth=0):
if skip_depth > 0:
raise NotImplementedError("skip_depth > 0")
self.skip_depth = skip_depth
class msgpack:
pass # TODO: not implemented
"""Complex argument to ``ComplexModelBase.Attributes.store_as`` for storing
the class instance as a MessagePack document.
Make sure you don't mix this with the msgpack package when importing.
"""
def __init__(self):
pass
PSSM_VALUES = {'json': json, 'xml': xml, 'msgpack': msgpack, 'table': table}
"""Persistent storage serialization method values"""
class TypeInfo(odict):
pass
class _SimpleTypeInfoElement(object):
__slots__ = ['path', 'parent', 'type']
def __init__(self, path, parent, type_):
self.path = path
self.parent = parent
self.type = type_
class XmlAttribute(ModelBase):
[docs] """Items which are marshalled as attributes of the parent element."""
def __new__(cls, typ, use=None, ns=None, attribute_of=None):
retval = cls.customize()
retval._typ = typ
retval._use = use
retval._ns = ns
retval._attribute_of = attribute_of
return retval
@classmethod
def marshall(cls, name, value, parent_elt):
if cls._ns is not None:
name = "{%s}%s" % (cls._ns,name)
if value is not None:
parent_elt.set(name, cls._typ.to_string(value))
@classmethod
def describe(cls, name, element, document):
element.set('name', name)
element.set('type', cls._typ.get_type_name_ns(document.interface))
if cls._use is not None:
element.set('use', cls._use)
@staticmethod
def resolve_namespace(cls, default_ns):
cls._typ.resolve_namespace(cls._typ, default_ns)
cls.__namespace__ = cls._ns
if cls.__namespace__ is None:
cls.__namespace__ = cls._typ.get_namespace()
if cls.__namespace__ in namespace.const_prefmap:
cls.__namespace__ = default_ns
class XmlAttributeRef(XmlAttribute):
[docs] """Reference to an Xml attribute."""
def __init__(self, ref, use=None):
self._ref = ref
self._use = use
def describe(self, name, element, app):
element.set('ref', self._ref)
if self._use:
element.set('use', self._use)
class SelfReference(object):
[docs] '''Use this as a placeholder type in classes that contain themselves. See
:func:`spyne.test.model.test_complex.TestComplexModel.test_self_reference`.
'''
def __init__(self):
raise NotImplementedError()
def _get_spyne_type(v):
try:
v = NATIVE_MAP.get(v, v)
except TypeError:
return
try:
subc = issubclass(v, ModelBase) or issubclass(v, SelfReference)
except:
subc = False
if subc:
if issubclass(v, Array) and len(v._type_info) != 1:
raise Exception("Invalid Array definition in %s.%s."% (cls_name, k))
elif issubclass(v, Point) and v.Attributes.dim is None:
raise Exception("Please specify the number of dimensions")
return v
def _join_args(x, y):
if x is None:
return y
if y is None:
return x
xa, xk = sanitize_args(x)
ya, yk = sanitize_args(y)
xk = dict(xk)
xk.update(yk)
return xa + ya, xk
class ComplexModelMeta(type(ModelBase)):
[docs] """If you want to make a better class type, this is what you should inherit
from.
"""
__mixin__ = False
class Attributes(ModelBase.Attributes):
[docs] """ComplexModel-specific attributes"""
store_as = None
"""Method for serializing to persistent storage. One of %r. It makes
sense to specify this only when this object is a child of another
ComplexModel sublass.""" % (PSSM_VALUES,)
table_name = None
"""Table name."""
sqla_metadata = None
"""None or :class:`sqlalchemy.MetaData` instance."""
sqla_table_args = None
"""A dict that will be passed to :class:`sqlalchemy.schema.Table`
constructor as ``**kwargs``.
"""
sqla_mapper_args = None
"""A dict that will be passed to :func:`sqlalchemy.orm.mapper`
constructor as. ``**kwargs``.
"""
sqla_table = None
"""The sqlalchemy table object"""
sqla_mapper = None
"""The sqlalchemy mapper object"""
def __init__(self, **kwargs):
super(ComplexModelBase, self).__init__()
# this ugliness is due to sqlalchemy's forcing of relevant types for
# database fields
for k in self.get_flat_type_info(self.__class__).keys():
try:
delattr(self, k)
except:
try:
setattr(self, k, None)
except:
try:
setattr(self, k, [])
except:
pass
for k,v in kwargs.items():
setattr(self, k, v)
def __len__(self):
return len(self._type_info)
def __getitem__(self, i):
return getattr(self, self._type_info.keys()[i], None)
def __repr__(self):
return "%s(%s)" % (self.get_type_name(), ', '.join(
['%s=%r' % (k, getattr(self, k, None))
for k in self.__class__.get_flat_type_info(self.__class__)
if getattr(self, k, None) is not None]))
@classmethod
def get_serialization_instance(cls, value):
[docs] """Returns the native object corresponding to the serialized form passed
in the ``value`` argument.
:param value: This argument can be:
* A list or tuple of native types aligned with cls._type_info.
* A dict of native types.
* The native type itself.
If the value type is not a ``list``, ``tuple`` or ``dict``, the
value is returned untouched.
"""
# if the instance is a list, convert it to a cls instance.
# this is only useful when deserializing method arguments for a client
# request which is the only time when the member order is not arbitrary
# (as the members are declared and passed around as sequences of
# arguments, unlike dictionaries in a regular class definition).
if isinstance(value, list) or isinstance(value, tuple):
assert len(value) <= len(cls._type_info)
inst = cls()
keys = cls._type_info.keys()
for i in range(len(value)):
setattr(inst, keys[i], value[i])
elif isinstance(value, dict):
inst = cls()
for k in cls._type_info:
setattr(inst, k, value.get(k, None))
else:
inst = value
return inst
@classmethod
def get_deserialization_instance(cls):
[docs] """Get an empty native type so that the deserialization logic can set
its attributes.
"""
if cls.__orig__ is None:
return cls()
else:
return cls.__orig__()
@classmethod
def get_members_pairs(cls, inst):
parent_cls = getattr(cls, '__extends__', None)
if not (parent_cls is None):
for r in parent_cls.get_members_pairs(parent_cls, inst):
yield r
for k, v in cls._type_info.items():
mo = v.Attributes.max_occurs
try:
subvalue = getattr(inst, k, None)
except: # to guard against e.g. sqlalchemy throwing NoSuchColumnError
subvalue = None
if mo > 1:
if subvalue != None:
yield (k, (v.to_string(sv) for sv in subvalue))
else:
yield (k, [v.to_string(subvalue)])
@classmethod
@nillable_dict
def to_dict(cls, value):
inst = cls.get_serialization_instance(value)
return dict(cls.get_members_pairs(inst))
@staticmethod
def get_flat_type_info(cls, retval=None):
[docs] """Returns a _type_info dict that includes members from all base classes.
It's called a "flat" dict because it flattens all members from the
inheritance hierarchy into one dict.
"""
if retval is None:
retval = TypeInfo()
parent = getattr(cls, '__extends__', None)
if parent != None:
cls.get_flat_type_info(parent, retval)
retval.update(cls._type_info)
return retval
@staticmethod
def get_simple_type_info(cls, hier_delim="_", retval=None, prefix=None,
[docs] parent=None):
"""Returns a _type_info dict that includes members from all base classes
and whose types are only primitives. It will prefix field names in
non-top-level complex objects with field name of its parent.
For example, given hier_delim='_'; the following hierarchy:
{'some_object': [{'some_string': ['abc']}]}
would be transformed to:
{'some_object_some_string': ['abc']}
"""
if retval is None:
retval = TypeInfo()
if prefix is None:
prefix = []
fti = cls.get_flat_type_info(cls)
for k, v in fti.items():
if getattr(v, 'get_flat_type_info', None) is None:
new_prefix = list(prefix)
new_prefix.append(k)
key = hier_delim.join(new_prefix)
value = retval.get(key, None)
if value:
raise ValueError("%r.%s conflicts with %r" % (cls, k, value))
retval[key] = _SimpleTypeInfoElement(path=tuple(new_prefix),
parent=parent, type_=v)
else:
new_prefix = list(prefix)
new_prefix.append(k)
v.get_simple_type_info(v, hier_delim, retval, new_prefix, parent=cls)
return retval
@classmethod
@nillable_string
def to_string(cls, value):
raise ValueError("Only primitives can be serialized to string.")
@classmethod
@nillable_string
def from_string(cls, string):
raise ValueError("Only primitives can be deserialized from string.")
@staticmethod
def resolve_namespace(cls, default_ns):
if getattr(cls, '__extends__', None) != None:
cls.__extends__.resolve_namespace(cls.__extends__, default_ns)
ModelBase.resolve_namespace(cls, default_ns)
for k, v in cls._type_info.items():
if v is None:
continue
if v.__type_name__ is ModelBase.Empty:
v.__namespace__ = cls.get_namespace()
v.__type_name__ = "%s_%s%s" % (cls.get_type_name(), k, TYPE_SUFFIX)
if not issubclass(v, cls):
v.resolve_namespace(v, default_ns)
if cls._force_own_namespace is not None:
for c in cls._force_own_namespace:
c.__namespace__ = cls.get_namespace()
ComplexModel.resolve_namespace(c, cls.get_namespace())
@staticmethod
def produce(namespace, type_name, members):
[docs] """Lets you create a class programmatically."""
return ComplexModelMeta(type_name, (ComplexModel,), {
'__namespace__': namespace,
'__type_name__': type_name,
'_type_info': TypeInfo(members),
})
@staticmethod
def alias(type_name, namespace, target):
[docs] """Return an alias class for the given target class.
This function is a variation on 'ComplexModel.produce'. The alias will
borrow the target's _type_info.
"""
retval = Alias.customize()
retval.__type_name__ = type_name
retval.__namespace__ = namespace
retval._type_info = {"_target": target}
return retval
@classmethod
def customize(cls, **kwargs):
[docs] """Duplicates cls and overwrites the values in ``cls.Attributes`` with
``**kwargs`` and returns the new class."""
store_as = kwargs.get('store_as', None)
if store_as is not None:
val = PSSM_VALUES.get(store_as, None)
if val is None:
assert isinstance(store_as, tuple(PSSM_VALUES.values())), \
"'store_as' should be one of: %r or an instance of %r not %r" \
% (tuple(PSSM_VALUES.keys()), tuple(PSSM_VALUES.values()),
store_as)
else:
kwargs['store_as'] = val()
cls_name, cls_bases, cls_dict = cls._s_customize(cls, **kwargs)
cls_dict['__module__'] = cls.__module__
retval = type(cls_name, cls_bases, cls_dict)
retval._type_info = cls._type_info
retval.__type_name__ = cls.__type_name__
retval.__namespace__ = cls.__namespace__
orig = getattr(retval, '__orig__', None)
if orig is not None:
retval.__extends__ = getattr(orig, '__extends__', None)
return retval
@classmethod
def store_as(cls, what):
return cls.customize(store_as=what)
class ComplexModel(ComplexModelBase):
[docs] """The general complexType factory. The __call__ method of this class will
return instances, contrary to primivites where the same call will result in
customized duplicates of the original class definition.
Those who'd like to customize the class should use the customize method.
(see :class:``spyne.model.ModelBase``).
"""
__metaclass__ = ComplexModelMeta
class Array(ComplexModelBase):
[docs] """This class generates a ComplexModel child that has one attribute that has
the same name as the serialized class. It's contained in a Python list.
"""
__metaclass__ = ComplexModelMeta
def __new__(cls, serializer, **kwargs):
retval = cls.customize(**kwargs)
serializer = _get_spyne_type(serializer)
if serializer is None:
raise ValueError(serializer)
# hack to default to unbounded arrays when the user didn't specify
# max_occurs. We should find a better way.
if serializer.Attributes.max_occurs == 1:
serializer = serializer.customize(max_occurs=decimal.Decimal('inf'))
if serializer.get_type_name() is ModelBase.Empty:
member_name = serializer.__base_type__.get_type_name()
if cls.__type_name__ is None:
cls.__type_name__ = ModelBase.Empty # to be resolved later
else:
member_name = serializer.get_type_name()
if cls.__type_name__ is None:
cls.__type_name__ = '%s%s' % (serializer.get_type_name(),
ARRAY_SUFFIX)
retval.__type_name__ = '%sArray' % member_name
retval._type_info = {member_name: serializer}
return retval
# the array belongs to its child's namespace, it doesn't have its own
# namespace.
@staticmethod
def resolve_namespace(cls, default_ns):
(serializer,) = cls._type_info.values()
serializer.resolve_namespace(serializer, default_ns)
if cls.__namespace__ is None:
cls.__namespace__ = serializer.get_namespace()
if cls.__namespace__ in namespace.const_prefmap:
cls.__namespace__ = default_ns
ComplexModel.resolve_namespace(cls, default_ns)
@classmethod
def get_serialization_instance(cls, value):
inst = ComplexModel.__new__(Array)
(member_name,) = cls._type_info.keys()
setattr(inst, member_name, value)
return inst
@classmethod
def get_deserialization_instance(cls):
return []
class Iterable(Array):
[docs] """This class generates a ``ComplexModel`` child that has one attribute that
has the same name as the serialized class. It's contained in a Python
iterable. The distinction with the ``Array`` is made in the protocol
implementation, this is just a marker.
"""
class Alias(ComplexModelBase):
[docs] """Different type_name, same _type_info."""
__metaclass__ = ComplexModelMeta
def log_repr(obj, cls=None):
[docs] """Use this function if you want to echo a ComplexModel subclass. It will
limit output size of the String types, thus make your logs smaller.
"""
if cls is None:
cls = obj.__class__
if issubclass(cls, Array):
retval = []
cls, = cls._type_info.values()
if not cls.Attributes.logged:
retval ="[%s (...)]" % cls.get_type_name()
else:
retval = _log_repr_obj(obj, cls)
for i,o in enumerate(obj):
retval.append(_log_repr_obj(o, cls))
if i > MAX_ARRAY_ELEMENT_NUM:
retval.append("(...)")
break
retval = "%s([%s])" % (cls.get_type_name(), ', '.join(retval))
elif issubclass(cls, ComplexModel):
if cls.Attributes.logged:
retval = _log_repr_obj(obj, cls)
else:
retval ="%s(...)" % cls.get_type_name()
else:
retval = repr(obj)
if len(retval) > MAX_STRING_FIELD_LENGTH:
retval = retval[:MAX_STRING_FIELD_LENGTH] + "(...)"
return retval
def _log_repr_obj(obj, cls):
retval = []
for k,t in cls.get_flat_type_info(cls).items():
v = getattr(obj, k, None)
if v is not None and t.Attributes.logged:
if issubclass(t, Unicode) and len(v) > MAX_STRING_FIELD_LENGTH:
s = '%s=%r(...)' % (k, v[:MAX_STRING_FIELD_LENGTH])
else:
s = '%s=%r' % (k, v)
retval.append(s)
return "%s(%s)" % (cls.get_type_name(), ', '.join(retval))
def TTableModel(metadata=None):
[docs] """A TableModel template that generates a new TableModel class for each
call. If metadata is not supplied, a new one is instantiated.
"""
import sqlalchemy
class TableModel(ComplexModelBase):
__metaclass__ = ComplexModelMeta
class Attributes(ComplexModelBase.Attributes):
sqla_metadata = metadata or sqlalchemy.MetaData()
return TableModel
try:
TableModel = TTableModel()
except ImportError:
pass