Last active
March 23, 2023 17:14
-
-
Save zachmullen/dda829cdc267b6b40f09daf55812cc99 to your computer and use it in GitHub Desktop.
queue_check_pika.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@access.user | |
@autoDescribeRoute( | |
Description("Get the position of a queued job.") | |
.modelParam("id", model=Job, level=AccessType.READ) | |
) | |
def getQueuePosition(self, job): | |
# Note: this only works with rabbitmq. Celery's abstraction does not provide | |
# an interface at this level. | |
taskId = job["celeryTaskId"] | |
broker = Setting().get("worker.broker") | |
connection = pika.BlockingConnection(pika.URLParameters(broker)) | |
channel = connection.channel() | |
pos = -1 | |
for i, (_, properties, _) in enumerate(channel.consume("celery", inactivity_timeout=0)): | |
if properties is None: | |
break # this means we've exhausted the queue | |
if i >= MAX_QUEUE_CHECK - 1: | |
pos = MAX_QUEUE_CHECK | |
break # we've checked as many as we're willing to | |
if properties.headers["id"] == taskId: | |
pos = i | |
break | |
channel.cancel() | |
connection.close() | |
if pos == -1 or pos == MAX_QUEUE_CHECK: | |
# If we fail to find it in the checked elements of the queue, we reload | |
# the job document and show its updated status, in case it's been started. | |
job = Job().load(job["_id"], force=True) | |
return {"position": pos, "checkedCount": MAX_QUEUE_CHECK, "status": job["status"]} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment