Skip to content

Instantly share code, notes, and snippets.

@zachmullen
Last active January 2, 2023 17:03
Show Gist options
  • Save zachmullen/3526a3963cfc6b628325acb9c45be669 to your computer and use it in GitHub Desktop.
Save zachmullen/3526a3963cfc6b628325acb9c45be669 to your computer and use it in GitHub Desktop.
rabbitmq queue position check
task_id = "change_this" # TODO change this to your celery task ID
broker = "amqp://guest:guest@localhost/" # TODO set this
MAX_QUEUE_CHECK = 20
try:
connection = pika.BlockingConnection(pika.URLParameters(broker))
channel = connection.channel()
pos = -1
for i, (_, properties, _) in enumerate(channel.consume("celery", inactivity_timeout=0)):
if properties is None:
break # this means we've exhausted the queue
if properties.headers["id"] == task_id:
pos = i
break
if i >= MAX_QUEUE_CHECK - 1:
pos = MAX_QUEUE_CHECK
break # we've checked as many as we're willing to
channel.cancel()
connection.close()
except pika.exceptions.AMQPError:
logger.exception(f"Could not reach RabbitMQ: {broker}")
return
if pos == -1:
# Means the queue is empty, or the task ID was not in found in the queue
pass
elif pos == MAX_QUEUE_CHECK:
# Means we searched MAX_QUEUE_CHECK places in the queue, but still haven't found our task
pass
@netlob
Copy link

netlob commented Jan 2, 2023

does this remove the messages from the queue?

@zachmullen
Copy link
Author

@netlob no, this leaves the message in the queue. However I have a newer version of this that speaks directly over the queue using the pika library.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment