Skip to content

Instantly share code, notes, and snippets.

@JoelBender
Created January 31, 2019 02:52
Show Gist options
  • Save JoelBender/c1c5c0dcd67a56f89869f48790d59927 to your computer and use it in GitHub Desktop.
Save JoelBender/c1c5c0dcd67a56f89869f48790d59927 to your computer and use it in GitHub Desktop.
aioamqp connection lost non-feature
"""
RPC server, aioamqp implementation of RPC examples from RabbitMQ tutorial
"""
import asyncio
import logging
import signal
import aioamqp
from aioamqp.protocol import AmqpProtocol
CLOSE_CHANNEL = True
CLOSE_PROTOCOL = True
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n-1) + fib(n-2)
class MyProtocol(AmqpProtocol):
def eof_received(self):
print(" [X] EOF received")
return asyncio.StreamReaderProtocol.eof_received(self)
class FibonacciRpcServer():
def __init__(self):
self.transport = None
self.protocol = None
self.channel = None
self.callback_queue = None
self.keep_running = True
async def connect(self):
self.transport, self.protocol = await aioamqp.connect(login_method='PLAIN', protocol_factory=MyProtocol)
self.channel = await self.protocol.channel()
await self.channel.queue_declare(queue_name='rpc_queue')
await self.channel.basic_qos(prefetch_count=1, prefetch_size=0, connection_global=False)
await self.channel.basic_consume(self.on_request, queue_name='rpc_queue')
print(" [x] Awaiting RPC requests")
async def disconnect(self):
print(" [x] Disconnect")
async def stop(self):
self.keep_running = False
print(" [x] Stopping")
if CLOSE_CHANNEL:
await self.channel.close()
print(" [x] Channel closed")
if CLOSE_PROTOCOL:
await self.protocol.close()
print(" [x] Protocol closed")
async def main_loop(self):
try:
while self.keep_running:
await self.connect()
await self.protocol.wait_closed()
await self.disconnect()
print(" [x] Not running")
finally:
await self.disconnect()
async def on_request(self, channel, body, envelope, properties):
n = int(body)
print(" [.] fib(%s)" % n)
response = fib(n)
await channel.basic_publish(
payload=str(response),
exchange_name='',
routing_key=properties.reply_to,
properties={
'correlation_id': properties.correlation_id,
},
)
await channel.basic_client_ack(delivery_tag=envelope.delivery_tag)
async def shutdown(loop):
tasks = [t for t in asyncio.Task.all_tasks() if t is not
asyncio.Task.current_task(loop) and not t.done]
for task in tasks:
task.cancel()
print("Canceling outstanding tasks: {!r}".format(tasks))
await asyncio.gather(*tasks)
print('Outstanding tasks canceled')
loop.stop()
print('Shutdown complete.')
channel_logger = logging.getLogger('aioamqp.channel')
loop = asyncio.get_event_loop()
server = FibonacciRpcServer()
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
for s in signals:
loop.add_signal_handler(
s, lambda s=s: loop.create_task(server.stop())
)
loop.run_until_complete(server.main_loop())
loop.run_until_complete(shutdown(loop))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment