Skip to content

Instantly share code, notes, and snippets.

@DFilyushin
Last active December 9, 2020 05:47
Show Gist options
  • Save DFilyushin/730fe4ae992a64df20e0a7818114d32f to your computer and use it in GitHub Desktop.
Save DFilyushin/730fe4ae992a64df20e0a7818114d32f to your computer and use it in GitHub Desktop.
rabbit_mq_consumer
import asyncio
import aio_pika
import time
import sys
async def process_message(message: aio_pika.IncomingMessage):
async with message.process():
# worker_name = sys.argv[1]
print(message.body, 'queue: result', message.routing_key)
# time.sleep(1)
# await asyncio.sleep(1)
async def main(loop):
connection: aio_pika.RobustConnection = await aio_pika.connect_robust(
"amqp://guest:guest@127.0.0.1/", loop=loop
)
queue_name = "queue_result"
channel = await connection.channel()
exchange = await channel.declare_exchange("direct", durable=True, auto_delete=False)
queue = await channel.declare_queue(queue_name, auto_delete=False)
await queue.bind(exchange, queue_name)
await queue.consume(process_message)
print('[x] Start consumer for ', queue_name)
return connection
if __name__ == "__main__":
loop = asyncio.get_event_loop()
connection = loop.run_until_complete(main(loop))
try:
loop.run_forever()
finally:
loop.run_until_complete(connection.close())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment