Some preliminary information would be handy before delving into details:
Here’s what happens when a request arrives to a Spyne server:
The server transport decides whether this is a simple interface document request or a remote procedure call request. Every transport has its own way of dealing with this.
If the incoming request was for the interface document, it’s easy: The interface document needs to be generated and returned as a nice chunk of strings to the client. The server transport first calls spyne.interface._base.InterfaceBase.build_interface_document() which builds and caches the document and later calls the spyne.interface._base.InterfaceBase.get_interface_document() that returns the cached document.
If it was an RPC request, here’s what happens:
The same logic applies to client transports, in reverse.
So if you want to implement a new transport or protocol, all you need to do is to subclass the relevant base class and implement the missing methods.
Here’s the source code in one file: https://github.com/arskom/spyne/blob/master/examples/queue.py
The following block of code is SQLAlchemy boilerplate for creating the database and other related machinery. Under normal conditions, you should pass the sqlalchemy url to the Producer and Consumer instances instead of the connection object itself, but here as we deal with an in-memory database, global variable ugliness is just a nicer way to pass database handles.
db = create_engine('sqlite:///:memory:')
metadata = MetaData(bind=db)
DeclarativeBase = declarative_base(metadata=metadata)
This is the table where queued messages are stored. Note that it’s a vanilla SQLAlchemy object:
class TaskQueue(DeclarativeBase):
__tablename__ = 'task_queue'
id = Column(sqlalchemy.Integer, primary_key=True)
data = Column(sqlalchemy.LargeBinary, nullable=False)
This is the table where the task id of the last processed task for each worker is stored. Workers are identified by an integer.
class WorkerStatus(DeclarativeBase):
__tablename__ = 'worker_status'
worker_id = Column(sqlalchemy.Integer, nullable=False, primary_key=True,
autoincrement=False)
task_id = Column(sqlalchemy.Integer, ForeignKey(TaskQueue.id),
nullable=False)
The consumer is a spyne.server._base.ServerBase child that receives requests by polling the database.
The transport is for displaying it in the Wsdl. While it’s irrelevant here, it’s nice to put it in:
class Consumer(ServerBase):
transport = 'http://sqlalchemy.persistent.queue/'
We set the incoming values, create a database connection and set it to self.session:
def __init__(self, db, app, consumer_id):
ServerBase.__init__(self, app)
self.session = sessionmaker(bind=db)()
self.id = consumer_id
We also query the worker status table and get the id for the first task. If there is no record for own worker id, the server bootstraps its state:
try:
self.session.query(WorkerStatus) \
.filter_by(worker_id=self.id).one()
except NoResultFound:
self.session.add(WorkerStatus(worker_id=self.id, task_id=0))
self.session.commit()
This is the main loop for our server:
def serve_forever(self):
while True:
We first get the id of the last processed task:
last = self.session.query(WorkerStatus).with_lockmode("update") \
.filter_by(worker_id=self.id).one()
Which is used to get the next tasks to process:
task_queue = self.session.query(TaskQueue) \
.filter(TaskQueue.id > last.task_id) \
.order_by(TaskQueue.id)
Each task is an rpc request, so we create a spyne.MethodContext instance for each task and set transport-specific data to the ctx.transport object:
for task in task_queue:
ctx = MethodContext(self.app)
ctx.in_string = [task.data]
ctx.transport.consumer_id = self.id
ctx.transport.task_id = task.id
This call parses the incoming request:
self.get_in_object(ctx)
In case of an error when parsing the request, the server logs the error and continues to process the next task in queue. The get_out_string call is smart enough to notice and serialize the error. If this was a normal server, we’d worry about returning the error to the client as well as logging it.
if ctx.in_error:
self.get_out_string(ctx)
logging.error(''.join(ctx.out_string))
continue
As the request was parsed correctly, the user method can be called to process the task:
self.get_out_object(ctx)
The server should not care whether the error was an expected or unexpected one. So the error is logged and the server continues to process the next task in queue.
if ctx.out_error:
self.get_out_string(ctx)
logging.error(''.join(ctx.out_string))
continue
If task processing went fine, the server serializes the out object and logs that instead.
self.get_out_string(ctx)
logging.debug(''.join(ctx.out_string))
Finally, the task is marked as processed.
last.task_id = task.id self.session.commit()
Once all tasks in queue are consumed, the server waits a pre-defined amount of time before polling the database for new tasks:
time.sleep(10)
This concludes the worker implementation. But how do we put tasks in the task queue? That’s the job of the Producer class that is implemented as a Spyne client.
Implementing clients is a two-stage operation. The main transport logic is in the spyne.client.RemoteProcedureBase child that is a native Python callable whose function is to serialize the arguments, send it to the server, receive the reply, deserialize it and pass the return value to the python caller. However, in our case, the client does not return anything as calls are processed asyncronously and the return values are ignored.
We start with the constructor, where we initialize the SQLAlchemy database connection factory:
class RemoteProcedure(RemoteProcedureBase):
def __init__(self, db, app, name, out_header):
RemoteProcedureBase.__init__(self, db, app, name, out_header)
self.Session = sessionmaker(bind=db)
The implementation of the client is much simpler because we trust that the Spyne code will do The Right Thing. Here, we serialize the arguments:
def __call__(self, *args, **kwargs):
session = self.Session()
self.get_out_object(args, kwargs)
self.get_out_string()
out_string = ''.join(self.ctx.out_string)
And put the resulting bytestream to the database:
session.add(TaskQueue(data=out_string))
session.commit()
session.close()
Again, here the function does not return anything because this is an asyncronous client.
Here’s the Producer class whose sole purpose is to initialize the right callable factory.
class Producer(ClientBase):
def __init__(self, db, app):
ClientBase.__init__(self, db, app)
self.service = Service(RemoteProcedure, db, app)
This is the worker service that will process the tasks.
class AsyncService(ServiceBase):
@rpc(Integer)
def sleep(ctx, integer):
print "Sleeping for %d seconds..." % (integer)
time.sleep(integer)
And this event is here to do some logging.
def _on_method_call(ctx):
print "This is worker id %d, processing task id %d." % (
ctx.transport.consumer_id, ctx.transport.task_id)
AsyncService.event_manager.add_listener('method_call', _on_method_call)
It’s now time to deploy our service. We start by configuring the logger and creating the necessary sql tables:
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
logging.getLogger('sqlalchemy.engine.base.Engine').setLevel(logging.DEBUG)
metadata.create_all()
We then initialize our application:
application = Application([AsyncService], 'spyne.async',
interface=Wsdl11(), in_protocol=Soap11(), out_protocol=Soap11())
And queue some tasks:
producer = Producer(db, application)
for i in range(10):
producer.service.sleep(i)
And finally start the one worker to consume the queued tasks:
consumer = Consumer(db, application, 1)
consumer.serve_forever()
That’s about it! You can switch to another database engine that accepts multiple connections and insert tasks from another connection to see the consumer in action. You could also start other workers in other processes to see the pub-sub functionality.
Start hacking! Good luck, and be sure to pop out to the mailing list if you have questions.