Last active
August 31, 2016 18:00
-
-
Save ashcrow/ecb611337dba966c4255697e6c0a204d to your computer and use it in GitHub Desktop.
URL Path to Topic Router Example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import logging | |
from kombu import Connection, Exchange, Queue, Producer | |
from kombu.mixins import ConsumerMixin | |
exchange = Exchange('commissaire', type='direct') | |
# NOTE: Hardcoding the queue for the example | |
queue = Queue('clusters', exchange, 'http.clusters') | |
# NOTE: Only added for this example | |
logger = logging.getLogger('ClustersService') | |
logger.setLevel(logging.DEBUG) | |
handler = logging.StreamHandler() | |
handler.setFormatter(logging.Formatter( | |
'%(name)s.%(process)d(%(levelname)s): %(message)s')) | |
logger.handlers.append(handler) | |
# -- | |
class CommissaireService(ConsumerMixin): | |
""" | |
An example prototype CommissaireService base class. | |
""" | |
def __init__(self, connection, queues): | |
""" | |
Initializes a new Service instance. | |
:param connection: A kombu connection. | |
:type connection: kombu.Connection | |
:param queues: List of kombu.Queues to consume | |
:type queues: list | |
""" | |
self.logger = logging.getLogger(self.__class__.__name__) | |
self.logger.debug('Initializing {0}'.format(self.__class__.__name__)) | |
self._queues = queues | |
self.connection = connection | |
self.producer = Producer(self.connection.channel(), exchange) | |
self.logger.debug('Initializing finished') | |
def get_consumers(self, Consumer, channel): | |
""" | |
Returns the a list of consumers to watch. Called by the parent Mixin. | |
:param Consumer: Message consumer class. | |
:type Consumer: kombu.Consumer | |
:param channel: An opened channel. | |
:type channel: kombu.transport.*.Channel | |
:returns: A list of Consumer instances. | |
:rtype: list | |
""" | |
consumers = [] | |
for queue in self._queues: | |
self.logger.debug('Consuming on {0}'.format(queue.name)) | |
consumers.append( | |
Consumer(queue, callbacks=[self._wrap_on_message])) | |
return consumers | |
def _wrap_on_message(self, body, message): | |
""" | |
Wraps on_message for logging. | |
:param body: Body of the message. | |
:type body: str | |
:param message: The message instance. | |
:type message: kombu.message.Message | |
""" | |
self.logger.debug('Received message "{0}"'.format( | |
message.delivery_tag)) | |
self.on_message(body, message) | |
self.logger.debug('Message "{0}" {1} ackd'.format( | |
message.delivery_tag, | |
('was' if message.acknowledged else 'was not'))) | |
class ClustersService(CommissaireService): | |
""" | |
An example prototype service. | |
""" | |
def on_message(self, body, message): | |
""" | |
Called when a new message arrives. | |
:param body: Body of the message. | |
:type body: str | |
:param message: The message instance. | |
:type message: kombu.message.Message | |
""" | |
message.ack() | |
self.logger.debug('Responding to {0}'.format( | |
message.properties['reply_to'])) | |
response_queue = self.connection.SimpleQueue( | |
message.properties['reply_to']) | |
response_queue.put('You asked for clusters', outcome='success') | |
response_queue.close() | |
if __name__ == '__main__': | |
try: | |
# NOTE: Using redis in the prototype | |
ClustersService( | |
Connection('redis://localhost:6379/'), | |
[queue] | |
).run() | |
except KeyboardInterrupt: | |
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import uuid | |
from kombu import Connection, Exchange, Producer | |
exchange = Exchange('commissaire', type='direct') | |
# NOTE: Only added for this example | |
logger = logging.getLogger('Dispatcher') | |
logger.setLevel(logging.DEBUG) | |
handler = logging.StreamHandler() | |
handler.setFormatter(logging.Formatter('%(name)s(%(levelname)s): %(message)s')) | |
logger.handlers.append(handler) | |
# -- | |
class Dispatcher: | |
""" | |
Dispatches and translates between HTTP requests and bus services. | |
""" | |
#: Logging instance for all Dispatchers | |
logger = logging.getLogger('Router') | |
def __init__(self, router): | |
""" | |
Initializes a new Dispatcher instance. | |
.. todo:: | |
Make the bus connection configurable. | |
:param router: The router to dispatch with. | |
:type router: router.TopicRouter | |
""" | |
self._router = router | |
self._bus = Connection('redis://localhost:6379/') | |
self.producer = Producer(self._bus.channel(), exchange) | |
def dispatch(self, environ, start_response): | |
""" | |
Dispatches an HTTP request into a bus message, translates the results | |
and returns the HTTP response back to the requestor. | |
:param environ: WSGI environment dictionary. | |
:type environ: dict | |
:param start_response: WSGI start_response callable. | |
:type start_response: callable | |
:returns: The body of the HTTP response. | |
:rtype: Mixed | |
.. note:: | |
This prototype is using WSGI but other interfaces could be used. | |
""" | |
route = self._router.match( | |
environ['PATH_INFO'], environ['REQUEST_METHOD']) | |
# If we have a valid route | |
if route: | |
response_queue_name = 'response-{0}'.format(uuid.uuid4()) | |
response_queue = self._bus.SimpleQueue(response_queue_name) | |
# Generate a message and sent it off | |
self.producer.publish( | |
'An HTTP Request', | |
route['topic'], | |
reply_to=response_queue_name) | |
self.logger.debug( | |
'Message sent to {0}. Want {1}.'.format( | |
route['topic'], response_queue_name)) | |
# Get the resulting message back | |
msg = response_queue.get(block=True, timeout=10) | |
msg.ack() | |
response_queue.close() | |
self.logger.debug( | |
'Received: properties="{0}", payload="{1}"'.format( | |
msg.properties, msg.payload)) | |
# And handle the message based on it's outcome | |
if msg.properties.get('outcome') == 'success': | |
self.logger.debug( | |
'Got a success. Returning the payload to HTTP.') | |
start_response( | |
'200 OK', [('content-type', 'application/json')]) | |
return [bytes(msg.payload, 'utf8')] | |
# TODO: More outcome checks turning responses to HTTP ... | |
# If we have an unknown or missing outcome default to ISE | |
else: | |
self.logger.error( | |
'Unknown outcome: {0}. properties={1}'.format( | |
msg.properties, msg.payload)) | |
start_response( | |
'500 Internal Server Error', | |
[('content-type', 'text/html')]) | |
return [bytes('Internal Server Error', 'utf8')] | |
# Otherwise handle it as a 404 | |
start_response('404 Not Found', [('content-type', 'text/html')]) | |
return [bytes('Not Found', 'utf8')] | |
if __name__ == '__main__': | |
# See https://gist.github.com/ashcrow/ecb611337dba966c4255697e6c0a204d | |
from topicrouter import TopicRouter | |
# Make a topic router that takes in "handler"s. | |
mapper = TopicRouter() | |
mapper.register( | |
'^/api/v0/(?P<handler>[a-z]*)/?$', | |
'http.{handler}') | |
# Create the dispatcher | |
d = Dispatcher(mapper) | |
# Fake WSGI start_response | |
def start_response(code, properties): | |
logger.debug('start_response("{0}",{1})'.format(code, properties)) | |
print('\n====> I WILL 404') | |
print('HTTP BODY: {0}'.format( | |
d.dispatch({ | |
'PATH_INFO': '/idonotexist', | |
'REQUEST_METHOD': 'GET'}, | |
start_response))) | |
print('\n====> I WILL WORK') | |
print('HTTP BODY: {0}'.format( | |
d.dispatch({ | |
'PATH_INFO': '/api/v0/clusters/', | |
'REQUEST_METHOD': 'GET'}, | |
start_response))) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ python clustersservice.py & | |
[1] 5937 | |
$ python dispatcher.py | |
Router(INFO): Registered: {'path': '^/api/v0/(?P<handler>[a-z]*)/?$', 'methods': ['GET'], 'topic': 'http.{handler}', 'compiled': re.compile('^/api/v0/(?P<handler>[a-z]*)/?$')} | |
====> I WILL 404 | |
Router(DEBUG): Looking for a route for path="/idonotexist" method="GET"... | |
Router(DEBUG): No route fully matched path="/idonotexist" method="GET" | |
Dispatcher(DEBUG): start_response("404 Not Found",[('content-type', 'text/html')]) | |
HTTP BODY: [b'Not Found'] | |
====> I WILL WORK | |
Router(DEBUG): Looking for a route for path="/api/v0/clusters/" method="GET"... | |
Router(DEBUG): Found match for path="/api/v0/clusters/" method="GET". ^/api/v0/(?P<handler>[a-z]*)/?$. Checking methods... | |
Router(DEBUG): Found solid match for path="/api/v0/clusters/" method="GET". {'path': '/api/v0/clusters/', 'methods': ['GET'], 'topic': 'http.clusters', 'compiled': re.compile('^/api/v0/(?P<handler>[a-z]*)/?$')} | |
Router(DEBUG): Message sent to http.clusters. Want response-51e636d8-80d7-4718-85a1-9c609e16f20d. | |
----> Received: An HTTP Request | |
--> Responding to response-51e636d8-80d7-4718-85a1-9c609e16f20d | |
Router(DEBUG): Received: properties="{'outcome': 'success', 'delivery_info': {'routing_key': 'response-51e636d8-80d7-4718-85a1-9c609e16f20d', 'exchange': 'response-51e636d8-80d7-4718-85a1-9c609e16f20d', 'priority': 0}, 'delivery_tag': '5591fc40-3c98-4ce3-93d2-da49e7b05e9e', 'body_encoding': 'base64', 'delivery_mode': 2}", payload="You asked for clusters" | |
Router(DEBUG): Got a success. Returning the payload to HTTP. | |
Dispatcher(DEBUG): start_response("200 OK",[('content-type', 'application/json')]) | |
HTTP BODY: [b'You asked for clusters'] | |
$ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from wsgiref.simple_server import WSGIServer, make_server | |
from socketserver import ThreadingMixIn | |
# See https://gist.github.com/ashcrow/ecb611337dba966c4255697e6c0a204d | |
from dispatcher import Dispatcher | |
from topicrouter import TopicRouter | |
# --- | |
# Make a topic router that takes in "handler"s. | |
mapper = TopicRouter() | |
mapper.register( | |
'^/api/v0/(?P<handler>[a-z]*)/?$', | |
'http.{handler}') | |
# Create the dispatcher | |
dispatcher = Dispatcher(mapper) | |
class ThreadedWSGIServer(ThreadingMixIn, WSGIServer): | |
""" | |
Threaded version of the WSIServer | |
""" | |
pass | |
try: | |
httpd = make_server( | |
'127.0.0.1', | |
8000, | |
dispatcher.dispatch, server_class=ThreadedWSGIServer) | |
print('---\nMake sure redis and clusterservice is running and' | |
' head to http://127.0.0.1:8000/api/v0/clusters/\n---') | |
httpd.serve_forever() | |
except KeyboardInterrupt: | |
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import re | |
import logging | |
# NOTE: Only added for this example | |
r = logging.getLogger('Router') | |
r.setLevel(logging.DEBUG) | |
handler = logging.StreamHandler() | |
handler.setFormatter(logging.Formatter('%(name)s(%(levelname)s): %(message)s')) | |
r.handlers.append(handler) | |
# -- | |
class TopicRouter: | |
""" | |
An URL router which maps to AMQP topics. | |
""" | |
#: Logging instance for all Routers | |
logger = logging.getLogger('Router') | |
def __init__(self): | |
""" | |
Initializes a new TopicRouter instance. | |
""" | |
self._routes = {} | |
def __repr__(self): | |
""" | |
Creates and returns a string representation of the instance. | |
""" | |
pretty = '' | |
for x in self._routes.values(): | |
pretty += 'Route="{0}" to "{1}" for Methods "{2}"'.format( | |
x['path'], x['topic'], ', '.join(x['methods'])) | |
return pretty | |
def register(self, path, topic, methods=['GET']): | |
""" | |
Registers a URL path to a topic. When named groups are specified | |
in the path they can be used in the topic by using {NAME} syntax. | |
Example:: | |
mapper.register( | |
'^/my/(?P<handler>\w+)/', | |
'handlers.{handler}', | |
['GET', 'PUT', 'DELETE']) | |
:param path: Regular expression string of the path. | |
:type path: str | |
:param topic: The topic template. | |
:type topic: str | |
:param methods: A list of accepted methods. Default only accepts GET. | |
:type methods: list | |
""" | |
methods = [m.upper() for m in methods] | |
self._routes[path] = { | |
'compiled': re.compile(path), | |
'path': path, | |
'topic': topic, | |
'methods': methods, | |
} | |
self.logger.info('Registered: {0}'.format(self._routes[path])) | |
def match(self, path, method): | |
""" | |
Looks for a match for a path/method combination. | |
:param path: The URL path to match. | |
:type path: str | |
:param method: The HTTP method to match. | |
:type method: str | |
:returns: A dict containting routing information or None if no route. | |
:rtype: dict or None | |
""" | |
path_method = 'path="{0}" method="{1}"'.format(path, method) | |
self.logger.debug('Looking for a route for {0}...'.format(path_method)) | |
for route in self._routes.values(): | |
match = route['compiled'].match(path) | |
if match: | |
self.logger.debug( | |
'Found match for {0}. {1}. Checking methods...'.format( | |
path_method, route['path'])) | |
if method.upper() in route['methods']: | |
topic_kwargs = {} | |
if match.groups: | |
topic_kwargs = match.groupdict() | |
return_route = { | |
'compiled': route['compiled'], | |
'path': path, | |
'topic': route['topic'].format(**topic_kwargs), | |
'methods': route['methods'], | |
} | |
self.logger.debug( | |
'Found solid match for {0}. {1}'.format( | |
path_method, return_route)) | |
return return_route | |
self.logger.debug('No route fully matched {0}'.format(path_method)) | |
return None | |
if __name__ == '__main__': | |
MAPPER = TopicRouter() | |
MAPPER.register( | |
'^/api/v0/(?P<handler>[a-z]*)/?$', | |
'http.{handler}') | |
print('=> I SHOULD FAIL') | |
print(MAPPER.match('/fail', 'get')) | |
print('=> I SHOULD WORK') | |
print(MAPPER.match('/api/v0/clusters/', 'get')) |
To play with the http portion:
$ sudo systemctl start redis.service
$ python clusterservice.py &
...
$ python http_server.py &
$ xdg-open http://127.0.0.1:8000/api/v0/clusters/
<see communication in logs>
$
@mbarnes Moving prototyping to https://github.com/ashcrow/commissaire-http and https://github.com/ashcrow/commissaire-service as gists are getting unwieldy 😄
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Input pipeline would look like:
HTTP Request
->TopicRouter
->Dispatcher
->Service
Response pipeline would look like:
Service
->Dispatcher
->HTTP Response