Last active
November 26, 2024 21:41
-
-
Save astrojuanlu/17c2f05298d3549c55ac45b1c85cffb6 to your computer and use it in GitHub Desktop.
Asynchronous RabbitMQ/AMQP client using aiormq with graceful cancellation.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import logging | |
import os | |
import aiormq | |
logger = logging.getLogger(__name__) | |
RABBITMQ_HOST = os.environ["RABBITMQ_HOST"] | |
RABBITMQ_PORT = os.environ["RABBITMQ_PORT"] | |
RABBITMQ_VIRTUAL_HOST = os.environ["RABBITMQ_VIRTUAL_HOST"] | |
async def process_message( | |
message: aiormq.types.DeliveredMessage, | |
) -> None: | |
logger.info("Processing message %s", message.header) | |
logger.info(message.body.decode()) | |
async def main( | |
rabbitmq_user: str, | |
rabbitmq_password: str, | |
queue_name: str, | |
cancellation_event: asyncio.Event, | |
) -> None: | |
connection = await aiormq.connect( | |
f"amqp://{rabbitmq_user}:{rabbitmq_password}" | |
f"@{RABBITMQ_HOST}:{RABBITMQ_PORT}/{RABBITMQ_VIRTUAL_HOST}", | |
) | |
channel = await connection.channel() | |
declare_ok = await channel.queue_declare( | |
queue_name, | |
auto_delete=False, | |
durable=True, | |
) | |
await channel.basic_consume(declare_ok.queue, process_message, no_ack=True) | |
await cancellation_event.wait() | |
logger.info("Finished listening to passes") | |
await channel.close() | |
await connection.close() | |
async def shutdown(signal, loop, cancellation_event: asyncio.Event) -> None: | |
logging.info(f"Received exit signal %s...", signal.name) | |
logger.warning("Setting cancellation event") | |
cancellation_event.set() | |
if __name__ == "__main__": | |
import argparse | |
import asyncio | |
import signal | |
parser = argparse.ArgumentParser(__name__) | |
parser.add_argument("--rabbitmq-user", required=True) | |
parser.add_argument("--rabbitmq-password", required=True) | |
parser.add_argument("--rabbitmq-queue-name", default="roger-test-consumer") | |
parser.add_argument( | |
"-v", "--verbose", action="store_true", help="Display debug information" | |
) | |
args = parser.parse_args() | |
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO) | |
_e = asyncio.Event() | |
loop = asyncio.get_event_loop() | |
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT) | |
for s in signals: | |
loop.add_signal_handler( | |
s, lambda s=s: asyncio.create_task(shutdown(s, loop, _e)) | |
) | |
loop.run_until_complete( | |
main( | |
args.rabbitmq_user, | |
args.rabbitmq_password, | |
args.rabbitmq_queue_name, | |
_e, | |
) | |
) | |
logger.info("Bye!") | |
loop.close() |
Output:
$ PYTHONBUFFERED=1 PYTHONASYNCIODEBUG=1 python -Werror async_consumer.py --rabbitmq-user $RABBITMQ_USER --rabbitmq-password $RABBITMQ_PASS --rabbitmq-queue-name test-queue-1 -v
DEBUG:asyncio:Using selector: EpollSelector
DEBUG:asyncio:Get address info xxx:nnnn, type=<SocketKind.SOCK_STREAM: 1>
DEBUG:asyncio:Getting address info xxx:nnnn, type=<SocketKind.SOCK_STREAM: 1> took 25.616ms: [(<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('xxx', nnnn))]
DEBUG:asyncio:<asyncio.TransportSocket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('xxx', nnn), raddr=('xxx', nnn)> connected to xxx:nnnn: (<_SelectorSocketTransport fd=6 read=polling write=<idle, bufsize=0>>, <asyncio.streams.StreamReaderProtocol object at 0x7efe9babeca0>)
^CINFO:root:Received exit signal SIGINT...
WARNING:__main__:Setting cancellation event
INFO:__main__:Finished listening to passes
DEBUG:aiormq.connection:Can not read bytes from server:
Traceback (most recent call last):
File "/home/juanlu/.pyenv/versions/roger38/lib/python3.8/site-packages/aiormq/connection.py", line 375, in __reader
weight, channel, frame = await self.__receive_frame()
File "/home/juanlu/.pyenv/versions/roger38/lib/python3.8/site-packages/aiormq/connection.py", line 327, in __receive_frame
frame_header = await self.reader.readexactly(1)
File "/home/juanlu/.pyenv/versions/3.8.5/lib/python3.8/asyncio/streams.py", line 721, in readexactly
raise exceptions.IncompleteReadError(incomplete, n)
asyncio.exceptions.IncompleteReadError: 0 bytes read on a total of 1 expected bytes
INFO:__main__:Bye!
DEBUG:asyncio:Close <_UnixSelectorEventLoop running=False closed=False debug=True>
$ echo $?
0
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Inspiration: