Skip to content

Instantly share code, notes, and snippets.

@lamikae
Created July 9, 2010 19:57
Show Gist options
  • Save lamikae/469967 to your computer and use it in GitHub Desktop.
Save lamikae/469967 to your computer and use it in GitHub Desktop.
class CoreService(Thread):
"""Every message in the CoreService goes through AMQP.
The CoreService initiates an Exchange, when a
* JSON request is received from remote host by RPC
* SOAP request is received from remote host by WSDL
These are internal methods that are executed in daemon mode.
The CoreService API has public methods to
* send JSON request to remote host by RPC
* send SOAP request to remote host by WSDL
Messages are stateless; they store no session state.
In that sense the CoreService is purely functional,
except for the fact that it triggers side-effects,
that are RPC and WSDL requests.
The CoreService uses the AMQP for internal message passing
between threads. This allows for easier threading and an
interface to add more services to the message bus.
An incoming JSON-RPC message is stored to "rpc:inbox" as such.
An incoming SOAP message is stored to "soap:inbox" queue parsed to JSON.
An outgoing JSON message to another RPC server is stored to "rpc:outbox".
An outgoing SOAP message to another SOAP server is stored to "soap:outbox",
"""
class Meta:
abstract = True
connection = None
def __init__(self, *args, **kwargs):
Thread.__init__(self, daemon=True)
self.connection = BrokerConnection(
hostname="localhost", port=5672,
userid="guest", password="guest",
virtual_host="/")
class ExchangeFilter():
"""Message filter."""
def rpc_exchange(func):
"""Filter RPC -> AMQP."""
def do_filter(obj, message):
"""Preprocesses the message."""
# do whatever
if obj.__class__ == 'amqp':
core_msg = AMQPTranslation.amqp2rpc(message)
else:
core_msg = AMQPTranslation.rpc2amqp(message)
return func(obj, *core_msg)
do_filter.__name__ = func.__name__
do_filter.__dict__ = func.__dict__
do_filter.__doc__ = func.__doc__
return do_filter
class RPCService(CoreService):
""" RPC adapter for AMQP broker.
This daemon runs:
* RPC server for receiving incoming JSON requests from a remote host.
The request will be dispatched to AMQP:
exchange="CoreService", queue="rpc", routing_key="inbox".
* AMQP consumer exchange="CoreService", queue="rpc", routing_key="outbox"
will dispatch an JSON-RPC request to a pre-configured host.
The AMQP message content is in JSON, and can be filtered by business logic.
HOW to connect rpc:inbox -> soap:outbox ??
"""
def __init__(self, *args, **kwargs):
CoreService.__init__(self, *args, **kwargs)
# AMQP -> RPC
self.consumer = Consumer(
connection=self.connection,
exchange="CoreService",
queue="rpc",
routing_key="outbox"
)
# send RPC when there are messages in the AMQP outbox
self.consumer.register_callback(self.dispatch_rpc)
# RPC -> AMQP
self.server = RPCServer()
self.publisher = Publisher(
connection=self.connection,
exchange="CoreService",
queue="rpc",
routing_key="inbox"
# define an RPC API call
self.server.register_function(dispatch_amqp, 'localAPICall') # XXX
def run(self):
self.server.run() # forks
it = self.consumer.iterconsume()
# Go into the consumer loop.
while True:
log.debug("consumer loop")
it.next()
@ExchangeFilter.rpc_exchange
def dispatch_amqp(self, json, **kwargs):
""" RPC -> AMQP dispatcher.
Uses persistant publisher stored in object state.
Receives (self<RPCService>, message) from RPC,
and the filter returns (self, json, **kwargs).
"""
return self.publisher.send(json)
@staticmethod
@ExchangeFilter.rpc_exchange
def dispatch_rpc(message_data, json, **kwargs):
""" AMQP -> RPC dispatcher.
Receives (message_data, message) from AMQP,
and the filter returns (message_data, json, **kwargs).
@kwarg rpcmethod
"""
log.debug('dispatching RPC: %s' % json)
try:
# some pre-configured server
server = jsonrpclib.Server('http://127.0.0.1:8181')
# call dynamic method
#reply = server.anotherAPICall(message_data)
func = getattr(server, kwargs['rpcmethod'], json)
if callable(func):
reply = func()
log.debug(jsonrpclib.history.response)
finally:
message.ack()
return reply
class SOAPService(CoreService):
""" SOAP/WSDL adapter for AMQP broker.
The daemon runs:
* WSGI server offering a SOAP interface. The methods should be explicitly
defined in the implementation. The method can dispatch the request to AMQP:
exchange="CoreService", queue="soap", routing_key="inbox".
* AMQP consumer exchange="CoreService", queue="soap", routing_key="outbox".
HOW to connect soap:inbox -> rpc:outbox ??
"""
def __init__(self, *args, **kwargs):
CoreService.__init__(self, *args, **kwargs)
self.consumer = Consumer(
connection=self.connection,
exchange="CoreService",
queue="soap",
routing_key="outbox"
)
self.publisher = Publisher(
connection=self.connection,
exchange="CoreService",
queue="soap",
routing_key="inbox"
class Reactor(): # single-core Reactor
"""One Thread per one CoreService; the Reactor should start 2 Threads.
"""
core_services = None
def __init__(self,*args,**kwargs):
self.core_services = [
RPCService(),
SOAPService()
]
def __unicode__(self):
return 'Reactor'
def run(self):
log.debug("running reactor thread: %s" % self)
for service in self.core_services:
log.debug('activating service %s' % service)
service.start()
class AMQPTranslation():
"""The interface between adapters and AMQP."""
@staticmethod
def amqp2rpc(message):
return (message, {'rpcmethod': 'remoteAPICall'}
@staticmethod
def rpc2amqp(self):
return message
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment