Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Python 3.5 async / await example with HTTP-AMQP -bridge and Nix based development environment
# -*- coding: utf-8 -*-
import aioamqp
connection = None
protocol = None
async def disconnected(exception):
global connection, protocol
connection = None
protocol = None
print(exception)
async def get_channel():
global connection, protocol
if not connection or not protocol:
try:
connection, protocol = await aioamqp.connect(
host='localhost',
on_error=disconnected,
)
except aioamqp.AmqpClosedConnection as e:
await disconnected(e)
channel = await protocol.channel()
return channel
with import <nixpkgs> {};
let dependencies = rec {
_erlang = erlang.override { wxSupport = false; };
_rabbitmq_server = rabbitmq_server.override { erlang = _erlang; };
_enabled_plugins = builtins.toFile "enabled_plugins" "[rabbitmq_management].";
_tornado = with python35Packages; tornado.override {
name = "tornado-4.3b1";
src = fetchurl {
url = "https://pypi.python.org/packages/source/t/tornado/tornado-4.3b1.tar.gz";
sha256 = "c7ddda61d9469c5745f3ac00e480ede0703dd1a4ef540a3d9bd5e03e9796e430";
};
};
_aioamqp = with python35Packages; buildPythonPackage {
name = "aioamqp-0.4.0";
src = fetchurl {
url = "https://pypi.python.org/packages/source/a/aioamqp/aioamqp-0.4.0.tar.gz";
sha256 = "4882ca561f1aa88beba3398c8021e7918605c371f4c0019b66c12321edda10bf";
};
};
};
in with dependencies;
stdenv.mkDerivation rec {
name = "env";
env = buildEnv { name = name; paths = buildInputs; };
builder = builtins.toFile "builder.pl" ''
source $stdenv/setup; ln -s $env $out
'';
buildInputs = [
_rabbitmq_server
(python35.buildEnv.override {
ignoreCollisions = true;
extraLibs = [
_tornado
_aioamqp
];
})
];
shellHook = ''
mkdir -p $PWD/var
export RABBITMQ_LOG_BASE=$PWD/var
export RABBITMQ_MNESIA_BASE=$PWD/var
export RABBITMQ_ENABLED_PLUGINS_FILE=${_enabled_plugins}
export SSL_CERT_FILE=${cacert}/etc/ssl/certs/ca-bundle.crt
export PYTHONPATH=`pwd`
'';
}
# -*- coding: utf-8 -*-
import asyncio
import tornado.platform.asyncio
import tornado.web
from pickle import dumps
from connection import get_channel
class MainHandler(tornado.web.RequestHandler):
async def get(self):
channel = await get_channel()
reply_to = (await channel.queue_declare('', exclusive=True))['queue']
routing_key = self.request.uri.strip('/').replace('/', '.') or '/'
payload = dumps({key: getattr(self.request, key) for key in [
'files', 'body_arguments', 'query', 'query_arguments',
'body', 'path', 'method', 'uri', 'arguments',
'cookies', 'host', 'headers', 'remote_ip'
]})
await channel.publish(payload=payload,
exchange_name='amq.topic',
routing_key=routing_key,
properties={'reply_to': reply_to})
queue = asyncio.Queue()
async def handle(*args, messages=queue):
await messages.put(args)
consumer = (await channel.basic_consume(
reply_to, no_ack=True, callback=handle)
)['consumer_tag']
body, envelope, properties = await queue.get()
self.write(body)
await channel.basic_cancel(consumer)
await channel.queue_delete(reply_to)
await channel.close()
app = tornado.web.Application([
(r'.*', MainHandler),
])
if __name__ == "__main__":
tornado.platform.asyncio.AsyncIOMainLoop().install()
app.listen(8080)
asyncio.get_event_loop().run_forever()
from setuptools import setup
setup(name='myapp',
version='1.0.0',
py_modules=['connection', 'server', 'worker'],
install_requires=['aioamqp', 'tornado'])
with import <nixpkgs> {};
let dependencies = rec {
_erlang = erlang.override { wxSupport = false; };
_rabbitmq_server = rabbitmq_server.override { erlang = _erlang; };
_enabled_plugins = builtins.toFile "enabled_plugins" "[rabbitmq_management].";
_tornado = with python35Packages; tornado.override {
name = "tornado-4.3b1";
src = fetchurl {
url = "https://pypi.python.org/packages/source/t/tornado/tornado-4.3b1.tar.gz";
sha256 = "c7ddda61d9469c5745f3ac00e480ede0703dd1a4ef540a3d9bd5e03e9796e430";
};
};
_aioamqp = with python35Packages; buildPythonPackage {
name = "aioamqp-0.4.0";
src = fetchurl {
url = "https://pypi.python.org/packages/source/a/aioamqp/aioamqp-0.4.0.tar.gz";
sha256 = "4882ca561f1aa88beba3398c8021e7918605c371f4c0019b66c12321edda10bf";
};
};
_python35 = python35.buildEnv.override {
ignoreCollisions = true;
extraLibs = [
_tornado
_aioamqp
];
};
supervisord_conf = stdenv.mkDerivation {
name = "supervisord.conf";
builder = writeText "builder.sh" ''
source $stdenv/setup;
cat > $out << EOF
[supervisord]
logfile=./var/supervisord.log
logfile_maxbytes=50MB
logfile_backups=10
loglevel=error
pidfile=./var/supervisord.pid
childlogdir=./var
[supervisorctl]
[unix_http_server]
file=./var/supervisord.sock
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
[program:rabbitmq]
command=${_rabbitmq_server}/bin/rabbitmq-server
stopasgroup = true
environment = RABBITMQ_LOG_BASE="./var",RABBITMQ_MNESIA_BASE="./var",RABBITMQ_ENABLED_PLUGINS_FILE=${_enabled_plugins}
[program:server]
command=${_python35}/bin/python3 server.py
[program:worker]
command=${_python35}/bin/python3 worker.py
process_name=%(program_name)s-%(process_num)s
numprocs=2
EOF
'';
};
};
in with dependencies;
stdenv.mkDerivation rec {
name = "env";
env = buildEnv { name = name; paths = buildInputs; };
builder = builtins.toFile "builder.pl" ''
source $stdenv/setup; ln -s $env $out
'';
buildInputs = [
_rabbitmq_server
_python35
pythonPackages.supervisor
];
shellHook = ''
mkdir -p $PWD/var
alias supervisord="supervisord -c ${supervisord_conf}"
alias supervisorctl="supervisorctl -s unix://$PWD/var/supervisord.sock"
'';
}
# -*- coding: utf-8 -*-
import asyncio
import time
from pickle import loads
from connection import get_channel
counter = 0
async def work():
channel = await get_channel()
requests = (await channel.queue_declare('', exclusive=True))['queue']
await channel.queue_bind(requests, 'amq.topic', '#')
async def handle(body, envelope, properties):
global counter
counter += 1
request = loads(body)
response = request['uri']
await channel.basic_publish(payload=response,
exchange_name='',
routing_key=properties.reply_to)
print("{0:d} {1}: {2}".format(counter, time.ctime(), request['uri']))
await channel.basic_consume(requests, no_ack=True, callback=handle)
if __name__ == "__main__":
asyncio.get_event_loop().run_until_complete(work())
asyncio.get_event_loop().run_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.
You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session.