Skip to content

Instantly share code, notes, and snippets.

@ashcrow
Last active August 31, 2016 18:00
Show Gist options
  • Save ashcrow/ecb611337dba966c4255697e6c0a204d to your computer and use it in GitHub Desktop.
Save ashcrow/ecb611337dba966c4255697e6c0a204d to your computer and use it in GitHub Desktop.
URL Path to Topic Router Example
#!/usr/bin/env python3
import logging
from kombu import Connection, Exchange, Queue, Producer
from kombu.mixins import ConsumerMixin
exchange = Exchange('commissaire', type='direct')
# NOTE: Hardcoding the queue for the example
queue = Queue('clusters', exchange, 'http.clusters')
# NOTE: Only added for this example
logger = logging.getLogger('ClustersService')
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(
'%(name)s.%(process)d(%(levelname)s): %(message)s'))
logger.handlers.append(handler)
# --
class CommissaireService(ConsumerMixin):
"""
An example prototype CommissaireService base class.
"""
def __init__(self, connection, queues):
"""
Initializes a new Service instance.
:param connection: A kombu connection.
:type connection: kombu.Connection
:param queues: List of kombu.Queues to consume
:type queues: list
"""
self.logger = logging.getLogger(self.__class__.__name__)
self.logger.debug('Initializing {0}'.format(self.__class__.__name__))
self._queues = queues
self.connection = connection
self.producer = Producer(self.connection.channel(), exchange)
self.logger.debug('Initializing finished')
def get_consumers(self, Consumer, channel):
"""
Returns the a list of consumers to watch. Called by the parent Mixin.
:param Consumer: Message consumer class.
:type Consumer: kombu.Consumer
:param channel: An opened channel.
:type channel: kombu.transport.*.Channel
:returns: A list of Consumer instances.
:rtype: list
"""
consumers = []
for queue in self._queues:
self.logger.debug('Consuming on {0}'.format(queue.name))
consumers.append(
Consumer(queue, callbacks=[self._wrap_on_message]))
return consumers
def _wrap_on_message(self, body, message):
"""
Wraps on_message for logging.
:param body: Body of the message.
:type body: str
:param message: The message instance.
:type message: kombu.message.Message
"""
self.logger.debug('Received message "{0}"'.format(
message.delivery_tag))
self.on_message(body, message)
self.logger.debug('Message "{0}" {1} ackd'.format(
message.delivery_tag,
('was' if message.acknowledged else 'was not')))
class ClustersService(CommissaireService):
"""
An example prototype service.
"""
def on_message(self, body, message):
"""
Called when a new message arrives.
:param body: Body of the message.
:type body: str
:param message: The message instance.
:type message: kombu.message.Message
"""
message.ack()
self.logger.debug('Responding to {0}'.format(
message.properties['reply_to']))
response_queue = self.connection.SimpleQueue(
message.properties['reply_to'])
response_queue.put('You asked for clusters', outcome='success')
response_queue.close()
if __name__ == '__main__':
try:
# NOTE: Using redis in the prototype
ClustersService(
Connection('redis://localhost:6379/'),
[queue]
).run()
except KeyboardInterrupt:
pass
import logging
import uuid
from kombu import Connection, Exchange, Producer
exchange = Exchange('commissaire', type='direct')
# NOTE: Only added for this example
logger = logging.getLogger('Dispatcher')
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(name)s(%(levelname)s): %(message)s'))
logger.handlers.append(handler)
# --
class Dispatcher:
"""
Dispatches and translates between HTTP requests and bus services.
"""
#: Logging instance for all Dispatchers
logger = logging.getLogger('Router')
def __init__(self, router):
"""
Initializes a new Dispatcher instance.
.. todo::
Make the bus connection configurable.
:param router: The router to dispatch with.
:type router: router.TopicRouter
"""
self._router = router
self._bus = Connection('redis://localhost:6379/')
self.producer = Producer(self._bus.channel(), exchange)
def dispatch(self, environ, start_response):
"""
Dispatches an HTTP request into a bus message, translates the results
and returns the HTTP response back to the requestor.
:param environ: WSGI environment dictionary.
:type environ: dict
:param start_response: WSGI start_response callable.
:type start_response: callable
:returns: The body of the HTTP response.
:rtype: Mixed
.. note::
This prototype is using WSGI but other interfaces could be used.
"""
route = self._router.match(
environ['PATH_INFO'], environ['REQUEST_METHOD'])
# If we have a valid route
if route:
response_queue_name = 'response-{0}'.format(uuid.uuid4())
response_queue = self._bus.SimpleQueue(response_queue_name)
# Generate a message and sent it off
self.producer.publish(
'An HTTP Request',
route['topic'],
reply_to=response_queue_name)
self.logger.debug(
'Message sent to {0}. Want {1}.'.format(
route['topic'], response_queue_name))
# Get the resulting message back
msg = response_queue.get(block=True, timeout=10)
msg.ack()
response_queue.close()
self.logger.debug(
'Received: properties="{0}", payload="{1}"'.format(
msg.properties, msg.payload))
# And handle the message based on it's outcome
if msg.properties.get('outcome') == 'success':
self.logger.debug(
'Got a success. Returning the payload to HTTP.')
start_response(
'200 OK', [('content-type', 'application/json')])
return [bytes(msg.payload, 'utf8')]
# TODO: More outcome checks turning responses to HTTP ...
# If we have an unknown or missing outcome default to ISE
else:
self.logger.error(
'Unknown outcome: {0}. properties={1}'.format(
msg.properties, msg.payload))
start_response(
'500 Internal Server Error',
[('content-type', 'text/html')])
return [bytes('Internal Server Error', 'utf8')]
# Otherwise handle it as a 404
start_response('404 Not Found', [('content-type', 'text/html')])
return [bytes('Not Found', 'utf8')]
if __name__ == '__main__':
# See https://gist.github.com/ashcrow/ecb611337dba966c4255697e6c0a204d
from topicrouter import TopicRouter
# Make a topic router that takes in "handler"s.
mapper = TopicRouter()
mapper.register(
'^/api/v0/(?P<handler>[a-z]*)/?$',
'http.{handler}')
# Create the dispatcher
d = Dispatcher(mapper)
# Fake WSGI start_response
def start_response(code, properties):
logger.debug('start_response("{0}",{1})'.format(code, properties))
print('\n====> I WILL 404')
print('HTTP BODY: {0}'.format(
d.dispatch({
'PATH_INFO': '/idonotexist',
'REQUEST_METHOD': 'GET'},
start_response)))
print('\n====> I WILL WORK')
print('HTTP BODY: {0}'.format(
d.dispatch({
'PATH_INFO': '/api/v0/clusters/',
'REQUEST_METHOD': 'GET'},
start_response)))
$ python clustersservice.py &
[1] 5937
$ python dispatcher.py
Router(INFO): Registered: {'path': '^/api/v0/(?P<handler>[a-z]*)/?$', 'methods': ['GET'], 'topic': 'http.{handler}', 'compiled': re.compile('^/api/v0/(?P<handler>[a-z]*)/?$')}
====> I WILL 404
Router(DEBUG): Looking for a route for path="/idonotexist" method="GET"...
Router(DEBUG): No route fully matched path="/idonotexist" method="GET"
Dispatcher(DEBUG): start_response("404 Not Found",[('content-type', 'text/html')])
HTTP BODY: [b'Not Found']
====> I WILL WORK
Router(DEBUG): Looking for a route for path="/api/v0/clusters/" method="GET"...
Router(DEBUG): Found match for path="/api/v0/clusters/" method="GET". ^/api/v0/(?P<handler>[a-z]*)/?$. Checking methods...
Router(DEBUG): Found solid match for path="/api/v0/clusters/" method="GET". {'path': '/api/v0/clusters/', 'methods': ['GET'], 'topic': 'http.clusters', 'compiled': re.compile('^/api/v0/(?P<handler>[a-z]*)/?$')}
Router(DEBUG): Message sent to http.clusters. Want response-51e636d8-80d7-4718-85a1-9c609e16f20d.
----> Received: An HTTP Request
--> Responding to response-51e636d8-80d7-4718-85a1-9c609e16f20d
Router(DEBUG): Received: properties="{'outcome': 'success', 'delivery_info': {'routing_key': 'response-51e636d8-80d7-4718-85a1-9c609e16f20d', 'exchange': 'response-51e636d8-80d7-4718-85a1-9c609e16f20d', 'priority': 0}, 'delivery_tag': '5591fc40-3c98-4ce3-93d2-da49e7b05e9e', 'body_encoding': 'base64', 'delivery_mode': 2}", payload="You asked for clusters"
Router(DEBUG): Got a success. Returning the payload to HTTP.
Dispatcher(DEBUG): start_response("200 OK",[('content-type', 'application/json')])
HTTP BODY: [b'You asked for clusters']
$
#!/usr/bin/env python3
from wsgiref.simple_server import WSGIServer, make_server
from socketserver import ThreadingMixIn
# See https://gist.github.com/ashcrow/ecb611337dba966c4255697e6c0a204d
from dispatcher import Dispatcher
from topicrouter import TopicRouter
# ---
# Make a topic router that takes in "handler"s.
mapper = TopicRouter()
mapper.register(
'^/api/v0/(?P<handler>[a-z]*)/?$',
'http.{handler}')
# Create the dispatcher
dispatcher = Dispatcher(mapper)
class ThreadedWSGIServer(ThreadingMixIn, WSGIServer):
"""
Threaded version of the WSIServer
"""
pass
try:
httpd = make_server(
'127.0.0.1',
8000,
dispatcher.dispatch, server_class=ThreadedWSGIServer)
print('---\nMake sure redis and clusterservice is running and'
' head to http://127.0.0.1:8000/api/v0/clusters/\n---')
httpd.serve_forever()
except KeyboardInterrupt:
pass
#!/usr/bin/env python
import re
import logging
# NOTE: Only added for this example
r = logging.getLogger('Router')
r.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(name)s(%(levelname)s): %(message)s'))
r.handlers.append(handler)
# --
class TopicRouter:
"""
An URL router which maps to AMQP topics.
"""
#: Logging instance for all Routers
logger = logging.getLogger('Router')
def __init__(self):
"""
Initializes a new TopicRouter instance.
"""
self._routes = {}
def __repr__(self):
"""
Creates and returns a string representation of the instance.
"""
pretty = ''
for x in self._routes.values():
pretty += 'Route="{0}" to "{1}" for Methods "{2}"'.format(
x['path'], x['topic'], ', '.join(x['methods']))
return pretty
def register(self, path, topic, methods=['GET']):
"""
Registers a URL path to a topic. When named groups are specified
in the path they can be used in the topic by using {NAME} syntax.
Example::
mapper.register(
'^/my/(?P<handler>\w+)/',
'handlers.{handler}',
['GET', 'PUT', 'DELETE'])
:param path: Regular expression string of the path.
:type path: str
:param topic: The topic template.
:type topic: str
:param methods: A list of accepted methods. Default only accepts GET.
:type methods: list
"""
methods = [m.upper() for m in methods]
self._routes[path] = {
'compiled': re.compile(path),
'path': path,
'topic': topic,
'methods': methods,
}
self.logger.info('Registered: {0}'.format(self._routes[path]))
def match(self, path, method):
"""
Looks for a match for a path/method combination.
:param path: The URL path to match.
:type path: str
:param method: The HTTP method to match.
:type method: str
:returns: A dict containting routing information or None if no route.
:rtype: dict or None
"""
path_method = 'path="{0}" method="{1}"'.format(path, method)
self.logger.debug('Looking for a route for {0}...'.format(path_method))
for route in self._routes.values():
match = route['compiled'].match(path)
if match:
self.logger.debug(
'Found match for {0}. {1}. Checking methods...'.format(
path_method, route['path']))
if method.upper() in route['methods']:
topic_kwargs = {}
if match.groups:
topic_kwargs = match.groupdict()
return_route = {
'compiled': route['compiled'],
'path': path,
'topic': route['topic'].format(**topic_kwargs),
'methods': route['methods'],
}
self.logger.debug(
'Found solid match for {0}. {1}'.format(
path_method, return_route))
return return_route
self.logger.debug('No route fully matched {0}'.format(path_method))
return None
if __name__ == '__main__':
MAPPER = TopicRouter()
MAPPER.register(
'^/api/v0/(?P<handler>[a-z]*)/?$',
'http.{handler}')
print('=> I SHOULD FAIL')
print(MAPPER.match('/fail', 'get'))
print('=> I SHOULD WORK')
print(MAPPER.match('/api/v0/clusters/', 'get'))
@ashcrow
Copy link
Author

ashcrow commented Aug 29, 2016

Input pipeline would look like: HTTP Request -> TopicRouter -> Dispatcher -> Service

Response pipeline would look like: Service -> Dispatcher -> HTTP Response

@ashcrow
Copy link
Author

ashcrow commented Aug 30, 2016

To play with the http portion:

$ sudo systemctl start redis.service
$ python clusterservice.py &
...
$ python http_server.py &
$ xdg-open http://127.0.0.1:8000/api/v0/clusters/
<see communication in logs>
$ 

@ashcrow
Copy link
Author

ashcrow commented Aug 31, 2016

@mbarnes Moving prototyping to https://github.com/ashcrow/commissaire-http and https://github.com/ashcrow/commissaire-service as gists are getting unwieldy 😄

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment