Last active
November 10, 2020 17:44
-
-
Save astrojuanlu/17c2f05298d3549c55ac45b1c85cffb6 to your computer and use it in GitHub Desktop.
Asynchronous RabbitMQ/AMQP client using aiormq with graceful cancellation.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import logging | |
import os | |
import aiormq | |
logger = logging.getLogger(__name__) | |
RABBITMQ_HOST = os.environ["RABBITMQ_HOST"] | |
RABBITMQ_PORT = os.environ["RABBITMQ_PORT"] | |
RABBITMQ_VIRTUAL_HOST = os.environ["RABBITMQ_VIRTUAL_HOST"] | |
async def process_message( | |
message: aiormq.types.DeliveredMessage, | |
) -> None: | |
logger.info("Processing message %s", message.header) | |
logger.info(message.body.decode()) | |
async def main( | |
rabbitmq_user: str, | |
rabbitmq_password: str, | |
queue_name: str, | |
cancellation_event: asyncio.Event, | |
) -> None: | |
connection = await aiormq.connect( | |
f"amqp://{rabbitmq_user}:{rabbitmq_password}" | |
f"@{RABBITMQ_HOST}:{RABBITMQ_PORT}/{RABBITMQ_VIRTUAL_HOST}", | |
) | |
channel = await connection.channel() | |
declare_ok = await channel.queue_declare( | |
queue_name, | |
auto_delete=False, | |
durable=True, | |
) | |
await channel.basic_consume(declare_ok.queue, process_message, no_ack=True) | |
await cancellation_event.wait() | |
logger.info("Finished listening to passes") | |
await channel.close() | |
await connection.close() | |
async def shutdown(signal, loop, cancellation_event: asyncio.Event) -> None: | |
logging.info(f"Received exit signal %s...", signal.name) | |
logger.warning("Setting cancellation event") | |
cancellation_event.set() | |
if __name__ == "__main__": | |
import argparse | |
import asyncio | |
import signal | |
parser = argparse.ArgumentParser(__name__) | |
parser.add_argument("--rabbitmq-user", required=True) | |
parser.add_argument("--rabbitmq-password", required=True) | |
parser.add_argument("--rabbitmq-queue-name", default="roger-test-consumer") | |
parser.add_argument( | |
"-v", "--verbose", action="store_true", help="Display debug information" | |
) | |
args = parser.parse_args() | |
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO) | |
_e = asyncio.Event() | |
loop = asyncio.get_event_loop() | |
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT) | |
for s in signals: | |
loop.add_signal_handler( | |
s, lambda s=s: asyncio.create_task(shutdown(s, loop, _e)) | |
) | |
loop.run_until_complete( | |
main( | |
args.rabbitmq_user, | |
args.rabbitmq_password, | |
args.rabbitmq_queue_name, | |
_e, | |
) | |
) | |
logger.info("Bye!") | |
loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Output: