Skip to content

Instantly share code, notes, and snippets.

@astrojuanlu
Last active November 26, 2024 21:41
Show Gist options
  • Save astrojuanlu/17c2f05298d3549c55ac45b1c85cffb6 to your computer and use it in GitHub Desktop.
Save astrojuanlu/17c2f05298d3549c55ac45b1c85cffb6 to your computer and use it in GitHub Desktop.
Asynchronous RabbitMQ/AMQP client using aiormq with graceful cancellation.
import asyncio
import logging
import os
import aiormq
logger = logging.getLogger(__name__)
RABBITMQ_HOST = os.environ["RABBITMQ_HOST"]
RABBITMQ_PORT = os.environ["RABBITMQ_PORT"]
RABBITMQ_VIRTUAL_HOST = os.environ["RABBITMQ_VIRTUAL_HOST"]
async def process_message(
message: aiormq.types.DeliveredMessage,
) -> None:
logger.info("Processing message %s", message.header)
logger.info(message.body.decode())
async def main(
rabbitmq_user: str,
rabbitmq_password: str,
queue_name: str,
cancellation_event: asyncio.Event,
) -> None:
connection = await aiormq.connect(
f"amqp://{rabbitmq_user}:{rabbitmq_password}"
f"@{RABBITMQ_HOST}:{RABBITMQ_PORT}/{RABBITMQ_VIRTUAL_HOST}",
)
channel = await connection.channel()
declare_ok = await channel.queue_declare(
queue_name,
auto_delete=False,
durable=True,
)
await channel.basic_consume(declare_ok.queue, process_message, no_ack=True)
await cancellation_event.wait()
logger.info("Finished listening to passes")
await channel.close()
await connection.close()
async def shutdown(signal, loop, cancellation_event: asyncio.Event) -> None:
logging.info(f"Received exit signal %s...", signal.name)
logger.warning("Setting cancellation event")
cancellation_event.set()
if __name__ == "__main__":
import argparse
import asyncio
import signal
parser = argparse.ArgumentParser(__name__)
parser.add_argument("--rabbitmq-user", required=True)
parser.add_argument("--rabbitmq-password", required=True)
parser.add_argument("--rabbitmq-queue-name", default="roger-test-consumer")
parser.add_argument(
"-v", "--verbose", action="store_true", help="Display debug information"
)
args = parser.parse_args()
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO)
_e = asyncio.Event()
loop = asyncio.get_event_loop()
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
for s in signals:
loop.add_signal_handler(
s, lambda s=s: asyncio.create_task(shutdown(s, loop, _e))
)
loop.run_until_complete(
main(
args.rabbitmq_user,
args.rabbitmq_password,
args.rabbitmq_queue_name,
_e,
)
)
logger.info("Bye!")
loop.close()
@astrojuanlu
Copy link
Author

Output:

$ PYTHONBUFFERED=1 PYTHONASYNCIODEBUG=1 python -Werror async_consumer.py --rabbitmq-user $RABBITMQ_USER --rabbitmq-password $RABBITMQ_PASS --rabbitmq-queue-name test-queue-1 -v
DEBUG:asyncio:Using selector: EpollSelector
DEBUG:asyncio:Get address info xxx:nnnn, type=<SocketKind.SOCK_STREAM: 1>
DEBUG:asyncio:Getting address info xxx:nnnn, type=<SocketKind.SOCK_STREAM: 1> took 25.616ms: [(<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('xxx', nnnn))]
DEBUG:asyncio:<asyncio.TransportSocket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('xxx', nnn), raddr=('xxx', nnn)> connected to xxx:nnnn: (<_SelectorSocketTransport fd=6 read=polling write=<idle, bufsize=0>>, <asyncio.streams.StreamReaderProtocol object at 0x7efe9babeca0>)
^CINFO:root:Received exit signal SIGINT...
WARNING:__main__:Setting cancellation event
INFO:__main__:Finished listening to passes
DEBUG:aiormq.connection:Can not read bytes from server:
Traceback (most recent call last):
  File "/home/juanlu/.pyenv/versions/roger38/lib/python3.8/site-packages/aiormq/connection.py", line 375, in __reader
    weight, channel, frame = await self.__receive_frame()
  File "/home/juanlu/.pyenv/versions/roger38/lib/python3.8/site-packages/aiormq/connection.py", line 327, in __receive_frame
    frame_header = await self.reader.readexactly(1)
  File "/home/juanlu/.pyenv/versions/3.8.5/lib/python3.8/asyncio/streams.py", line 721, in readexactly
    raise exceptions.IncompleteReadError(incomplete, n)
asyncio.exceptions.IncompleteReadError: 0 bytes read on a total of 1 expected bytes
INFO:__main__:Bye!
DEBUG:asyncio:Close <_UnixSelectorEventLoop running=False closed=False debug=True>
$ echo $?
0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment