Skip to content

Instantly share code, notes, and snippets.

@zachmullen
Last active March 23, 2023 17:14
Show Gist options
  • Save zachmullen/dda829cdc267b6b40f09daf55812cc99 to your computer and use it in GitHub Desktop.
Save zachmullen/dda829cdc267b6b40f09daf55812cc99 to your computer and use it in GitHub Desktop.
queue_check_pika.py
@access.user
@autoDescribeRoute(
Description("Get the position of a queued job.")
.modelParam("id", model=Job, level=AccessType.READ)
)
def getQueuePosition(self, job):
# Note: this only works with rabbitmq. Celery's abstraction does not provide
# an interface at this level.
taskId = job["celeryTaskId"]
broker = Setting().get("worker.broker")
connection = pika.BlockingConnection(pika.URLParameters(broker))
channel = connection.channel()
pos = -1
for i, (_, properties, _) in enumerate(channel.consume("celery", inactivity_timeout=0)):
if properties is None:
break # this means we've exhausted the queue
if i >= MAX_QUEUE_CHECK - 1:
pos = MAX_QUEUE_CHECK
break # we've checked as many as we're willing to
if properties.headers["id"] == taskId:
pos = i
break
channel.cancel()
connection.close()
if pos == -1 or pos == MAX_QUEUE_CHECK:
# If we fail to find it in the checked elements of the queue, we reload
# the job document and show its updated status, in case it's been started.
job = Job().load(job["_id"], force=True)
return {"position": pos, "checkedCount": MAX_QUEUE_CHECK, "status": job["status"]}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment