Last active
September 22, 2023 11:13
-
-
Save datakurre/2076247049dabe16627f to your computer and use it in GitHub Desktop.
Python 3.5 async / await example with HTTP-AMQP -bridge and Nix based development environment
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import aioamqp | |
connection = None | |
protocol = None | |
async def disconnected(exception): | |
global connection, protocol | |
connection = None | |
protocol = None | |
print(exception) | |
async def get_channel(): | |
global connection, protocol | |
if not connection or not protocol: | |
try: | |
connection, protocol = await aioamqp.connect( | |
host='localhost', | |
on_error=disconnected, | |
) | |
except aioamqp.AmqpClosedConnection as e: | |
await disconnected(e) | |
channel = await protocol.channel() | |
return channel |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
with import <nixpkgs> {}; | |
let dependencies = rec { | |
_erlang = erlang.override { wxSupport = false; }; | |
_rabbitmq_server = rabbitmq_server.override { erlang = _erlang; }; | |
_enabled_plugins = builtins.toFile "enabled_plugins" "[rabbitmq_management]."; | |
_tornado = with python35Packages; tornado.override { | |
name = "tornado-4.3b1"; | |
src = fetchurl { | |
url = "https://pypi.python.org/packages/source/t/tornado/tornado-4.3b1.tar.gz"; | |
sha256 = "c7ddda61d9469c5745f3ac00e480ede0703dd1a4ef540a3d9bd5e03e9796e430"; | |
}; | |
}; | |
_aioamqp = with python35Packages; buildPythonPackage { | |
name = "aioamqp-0.4.0"; | |
src = fetchurl { | |
url = "https://pypi.python.org/packages/source/a/aioamqp/aioamqp-0.4.0.tar.gz"; | |
sha256 = "4882ca561f1aa88beba3398c8021e7918605c371f4c0019b66c12321edda10bf"; | |
}; | |
}; | |
}; | |
in with dependencies; | |
stdenv.mkDerivation rec { | |
name = "env"; | |
env = buildEnv { name = name; paths = buildInputs; }; | |
builder = builtins.toFile "builder.pl" '' | |
source $stdenv/setup; ln -s $env $out | |
''; | |
buildInputs = [ | |
_rabbitmq_server | |
(python35.buildEnv.override { | |
ignoreCollisions = true; | |
extraLibs = [ | |
_tornado | |
_aioamqp | |
]; | |
}) | |
]; | |
shellHook = '' | |
mkdir -p $PWD/var | |
export RABBITMQ_LOG_BASE=$PWD/var | |
export RABBITMQ_MNESIA_BASE=$PWD/var | |
export RABBITMQ_ENABLED_PLUGINS_FILE=${_enabled_plugins} | |
export SSL_CERT_FILE=${cacert}/etc/ssl/certs/ca-bundle.crt | |
export PYTHONPATH=`pwd` | |
''; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import asyncio | |
import tornado.platform.asyncio | |
import tornado.web | |
from pickle import dumps | |
from connection import get_channel | |
class MainHandler(tornado.web.RequestHandler): | |
async def get(self): | |
channel = await get_channel() | |
reply_to = (await channel.queue_declare('', exclusive=True))['queue'] | |
routing_key = self.request.uri.strip('/').replace('/', '.') or '/' | |
payload = dumps({key: getattr(self.request, key) for key in [ | |
'files', 'body_arguments', 'query', 'query_arguments', | |
'body', 'path', 'method', 'uri', 'arguments', | |
'cookies', 'host', 'headers', 'remote_ip' | |
]}) | |
await channel.publish(payload=payload, | |
exchange_name='amq.topic', | |
routing_key=routing_key, | |
properties={'reply_to': reply_to}) | |
queue = asyncio.Queue() | |
async def handle(*args, messages=queue): | |
await messages.put(args) | |
consumer = (await channel.basic_consume( | |
reply_to, no_ack=True, callback=handle) | |
)['consumer_tag'] | |
body, envelope, properties = await queue.get() | |
self.write(body) | |
await channel.basic_cancel(consumer) | |
await channel.queue_delete(reply_to) | |
await channel.close() | |
app = tornado.web.Application([ | |
(r'.*', MainHandler), | |
]) | |
if __name__ == "__main__": | |
tornado.platform.asyncio.AsyncIOMainLoop().install() | |
app.listen(8080) | |
asyncio.get_event_loop().run_forever() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from setuptools import setup | |
setup(name='myapp', | |
version='1.0.0', | |
py_modules=['connection', 'server', 'worker'], | |
install_requires=['aioamqp', 'tornado']) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
with import <nixpkgs> {}; | |
let dependencies = rec { | |
_erlang = erlang.override { wxSupport = false; }; | |
_rabbitmq_server = rabbitmq_server.override { erlang = _erlang; }; | |
_enabled_plugins = builtins.toFile "enabled_plugins" "[rabbitmq_management]."; | |
_tornado = with python35Packages; tornado.override { | |
name = "tornado-4.3b1"; | |
src = fetchurl { | |
url = "https://pypi.python.org/packages/source/t/tornado/tornado-4.3b1.tar.gz"; | |
sha256 = "c7ddda61d9469c5745f3ac00e480ede0703dd1a4ef540a3d9bd5e03e9796e430"; | |
}; | |
}; | |
_aioamqp = with python35Packages; buildPythonPackage { | |
name = "aioamqp-0.4.0"; | |
src = fetchurl { | |
url = "https://pypi.python.org/packages/source/a/aioamqp/aioamqp-0.4.0.tar.gz"; | |
sha256 = "4882ca561f1aa88beba3398c8021e7918605c371f4c0019b66c12321edda10bf"; | |
}; | |
}; | |
_python35 = python35.buildEnv.override { | |
ignoreCollisions = true; | |
extraLibs = [ | |
_tornado | |
_aioamqp | |
]; | |
}; | |
supervisord_conf = stdenv.mkDerivation { | |
name = "supervisord.conf"; | |
builder = writeText "builder.sh" '' | |
source $stdenv/setup; | |
cat > $out << EOF | |
[supervisord] | |
logfile=./var/supervisord.log | |
logfile_maxbytes=50MB | |
logfile_backups=10 | |
loglevel=error | |
pidfile=./var/supervisord.pid | |
childlogdir=./var | |
[supervisorctl] | |
[unix_http_server] | |
file=./var/supervisord.sock | |
[rpcinterface:supervisor] | |
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface | |
[program:rabbitmq] | |
command=${_rabbitmq_server}/bin/rabbitmq-server | |
stopasgroup = true | |
environment = RABBITMQ_LOG_BASE="./var",RABBITMQ_MNESIA_BASE="./var",RABBITMQ_ENABLED_PLUGINS_FILE=${_enabled_plugins} | |
[program:server] | |
command=${_python35}/bin/python3 server.py | |
[program:worker] | |
command=${_python35}/bin/python3 worker.py | |
process_name=%(program_name)s-%(process_num)s | |
numprocs=2 | |
EOF | |
''; | |
}; | |
}; | |
in with dependencies; | |
stdenv.mkDerivation rec { | |
name = "env"; | |
env = buildEnv { name = name; paths = buildInputs; }; | |
builder = builtins.toFile "builder.pl" '' | |
source $stdenv/setup; ln -s $env $out | |
''; | |
buildInputs = [ | |
_rabbitmq_server | |
_python35 | |
pythonPackages.supervisor | |
]; | |
shellHook = '' | |
mkdir -p $PWD/var | |
alias supervisord="supervisord -c ${supervisord_conf}" | |
alias supervisorctl="supervisorctl -s unix://$PWD/var/supervisord.sock" | |
''; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import asyncio | |
import time | |
from pickle import loads | |
from connection import get_channel | |
counter = 0 | |
async def work(): | |
channel = await get_channel() | |
requests = (await channel.queue_declare('', exclusive=True))['queue'] | |
await channel.queue_bind(requests, 'amq.topic', '#') | |
async def handle(body, envelope, properties): | |
global counter | |
counter += 1 | |
request = loads(body) | |
response = request['uri'] | |
await channel.basic_publish(payload=response, | |
exchange_name='', | |
routing_key=properties.reply_to) | |
print("{0:d} {1}: {2}".format(counter, time.ctime(), request['uri'])) | |
await channel.basic_consume(requests, no_ack=True, callback=handle) | |
if __name__ == "__main__": | |
asyncio.get_event_loop().run_until_complete(work()) | |
asyncio.get_event_loop().run_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment