Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Python 3.5 async / await example with HTTP-AMQP -bridge and Nix based development environment
# -*- coding: utf-8 -*-
import aioamqp
connection = None
protocol = None
async def disconnected(exception):
global connection, protocol
connection = None
protocol = None
print(exception)
async def get_channel():
global connection, protocol
if not connection or not protocol:
try:
connection, protocol = await aioamqp.connect(
host='localhost',
on_error=disconnected,
)
except aioamqp.AmqpClosedConnection as e:
await disconnected(e)
channel = await protocol.channel()
return channel
with import <nixpkgs> {};
let dependencies = rec {
_erlang = erlang.override { wxSupport = false; };
_rabbitmq_server = rabbitmq_server.override { erlang = _erlang; };
_enabled_plugins = builtins.toFile "enabled_plugins" "[rabbitmq_management].";
_tornado = with python35Packages; tornado.override {
name = "tornado-4.3b1";
src = fetchurl {
url = "https://pypi.python.org/packages/source/t/tornado/tornado-4.3b1.tar.gz";
sha256 = "c7ddda61d9469c5745f3ac00e480ede0703dd1a4ef540a3d9bd5e03e9796e430";
};
};
_aioamqp = with python35Packages; buildPythonPackage {
name = "aioamqp-0.4.0";
src = fetchurl {
url = "https://pypi.python.org/packages/source/a/aioamqp/aioamqp-0.4.0.tar.gz";
sha256 = "4882ca561f1aa88beba3398c8021e7918605c371f4c0019b66c12321edda10bf";
};
};
};
in with dependencies;
stdenv.mkDerivation rec {
name = "env";
env = buildEnv { name = name; paths = buildInputs; };
builder = builtins.toFile "builder.pl" ''
source $stdenv/setup; ln -s $env $out
'';
buildInputs = [
_rabbitmq_server
(python35.buildEnv.override {
ignoreCollisions = true;
extraLibs = [
_tornado
_aioamqp
];
})
];
shellHook = ''
mkdir -p $PWD/var
export RABBITMQ_LOG_BASE=$PWD/var
export RABBITMQ_MNESIA_BASE=$PWD/var
export RABBITMQ_ENABLED_PLUGINS_FILE=${_enabled_plugins}
export SSL_CERT_FILE=${cacert}/etc/ssl/certs/ca-bundle.crt
export PYTHONPATH=`pwd`
'';
}
# -*- coding: utf-8 -*-
import asyncio
import tornado.platform.asyncio
import tornado.web
from pickle import dumps
from connection import get_channel
class MainHandler(tornado.web.RequestHandler):
async def get(self):
channel = await get_channel()
reply_to = (await channel.queue_declare('', exclusive=True))['queue']
routing_key = self.request.uri.strip('/').replace('/', '.') or '/'
payload = dumps({key: getattr(self.request, key) for key in [
'files', 'body_arguments', 'query', 'query_arguments',
'body', 'path', 'method', 'uri', 'arguments',
'cookies', 'host', 'headers', 'remote_ip'
]})
await channel.publish(payload=payload,
exchange_name='amq.topic',
routing_key=routing_key,
properties={'reply_to': reply_to})
queue = asyncio.Queue()
async def handle(*args, messages=queue):
await messages.put(args)
consumer = (await channel.basic_consume(
reply_to, no_ack=True, callback=handle)
)['consumer_tag']
body, envelope, properties = await queue.get()
self.write(body)
await channel.basic_cancel(consumer)
await channel.queue_delete(reply_to)
await channel.close()
app = tornado.web.Application([
(r'.*', MainHandler),
])
if __name__ == "__main__":
tornado.platform.asyncio.AsyncIOMainLoop().install()
app.listen(8080)
asyncio.get_event_loop().run_forever()
from setuptools import setup
setup(name='myapp',
version='1.0.0',
py_modules=['connection', 'server', 'worker'],
install_requires=['aioamqp', 'tornado'])
with import <nixpkgs> {};
let dependencies = rec {
_erlang = erlang.override { wxSupport = false; };
_rabbitmq_server = rabbitmq_server.override { erlang = _erlang; };
_enabled_plugins = builtins.toFile "enabled_plugins" "[rabbitmq_management].";
_tornado = with python35Packages; tornado.override {
name = "tornado-4.3b1";
src = fetchurl {
url = "https://pypi.python.org/packages/source/t/tornado/tornado-4.3b1.tar.gz";
sha256 = "c7ddda61d9469c5745f3ac00e480ede0703dd1a4ef540a3d9bd5e03e9796e430";
};
};
_aioamqp = with python35Packages; buildPythonPackage {
name = "aioamqp-0.4.0";
src = fetchurl {
url = "https://pypi.python.org/packages/source/a/aioamqp/aioamqp-0.4.0.tar.gz";
sha256 = "4882ca561f1aa88beba3398c8021e7918605c371f4c0019b66c12321edda10bf";
};
};
_python35 = python35.buildEnv.override {
ignoreCollisions = true;
extraLibs = [
_tornado
_aioamqp
];
};
supervisord_conf = stdenv.mkDerivation {
name = "supervisord.conf";
builder = writeText "builder.sh" ''
source $stdenv/setup;
cat > $out << EOF
[supervisord]
logfile=./var/supervisord.log
logfile_maxbytes=50MB
logfile_backups=10
loglevel=error
pidfile=./var/supervisord.pid
childlogdir=./var
[supervisorctl]
[unix_http_server]
file=./var/supervisord.sock
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
[program:rabbitmq]
command=${_rabbitmq_server}/bin/rabbitmq-server
stopasgroup = true
environment = RABBITMQ_LOG_BASE="./var",RABBITMQ_MNESIA_BASE="./var",RABBITMQ_ENABLED_PLUGINS_FILE=${_enabled_plugins}
[program:server]
command=${_python35}/bin/python3 server.py
[program:worker]
command=${_python35}/bin/python3 worker.py
process_name=%(program_name)s-%(process_num)s
numprocs=2
EOF
'';
};
};
in with dependencies;
stdenv.mkDerivation rec {
name = "env";
env = buildEnv { name = name; paths = buildInputs; };
builder = builtins.toFile "builder.pl" ''
source $stdenv/setup; ln -s $env $out
'';
buildInputs = [
_rabbitmq_server
_python35
pythonPackages.supervisor
];
shellHook = ''
mkdir -p $PWD/var
alias supervisord="supervisord -c ${supervisord_conf}"
alias supervisorctl="supervisorctl -s unix://$PWD/var/supervisord.sock"
'';
}
# -*- coding: utf-8 -*-
import asyncio
import time
from pickle import loads
from connection import get_channel
counter = 0
async def work():
channel = await get_channel()
requests = (await channel.queue_declare('', exclusive=True))['queue']
await channel.queue_bind(requests, 'amq.topic', '#')
async def handle(body, envelope, properties):
global counter
counter += 1
request = loads(body)
response = request['uri']
await channel.basic_publish(payload=response,
exchange_name='',
routing_key=properties.reply_to)
print("{0:d} {1}: {2}".format(counter, time.ctime(), request['uri']))
await channel.basic_consume(requests, no_ack=True, callback=handle)
if __name__ == "__main__":
asyncio.get_event_loop().run_until_complete(work())
asyncio.get_event_loop().run_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment