Skip to content

Instantly share code, notes, and snippets.

@lucasnad27
Created February 27, 2019 01:54
Show Gist options
  • Save lucasnad27/5453ad54411ffa54ce74198418df70cb to your computer and use it in GitHub Desktop.
Save lucasnad27/5453ad54411ffa54ce74198418df70cb to your computer and use it in GitHub Desktop.
from enum import Enum
import rq
from redis import Redis
import settings
class Priority(Enum):
low = 'low'
default = 'default'
high = 'high'
def publish(fn, priority=Priority.default, skip_duplicate=False, *args, **kwargs):
"""Publishes a message to the queue"""
redis_conn = Redis(host=settings.WORKER_REDIS_HOST)
q = rq.Queue(priority.value, connection=redis_conn)
if skip_duplicate:
# check to make sure there isn't already a job in the queue
raise NotImplementedError
q.enqueue(fn, *args, **kwargs)
def consume(priority):
"""Consumes messages from a specified queue"""
conn = Redis(host=settings.WORKER_REDIS_HOST)
with rq.Connection(conn):
worker = rq.Worker(priority.value)
worker.work()
"""
# existing imlementsation
for customer in orm.Customer.query.all():
queue.enqueue(
send_customer_listings_count_report,
kwargs={"geoid": customer.geoid, "email": customer.email},
timeout=(60 * 5), # 5 minutes
)
# becomes
for customer in orm.Customer.query.all():
worker.publish(
send_customer_listings_count_report,
kwargs={"geoid": customer.geoid, "email": customer.email},
)
"""
@igavrilov
Copy link

@lucasnad27 can you please give an example of skip_duplicate? I'd like to understand why do we need this feature.

@lucasnad27
Copy link
Author

@lucasnad27 can you please give an example of skip_duplicate? I'd like to understand why do we need this feature.

This "feature" already exists by default. Not sure what I want to do with it yet...

    # check for duplicate already running
    registry = StartedJobRegistry(queue, connection=conn)
    for job_id in registry.get_job_ids():
        job = q.fetch_job(job_id)
        if job.description == kwargs["description"]:
            return job

    # check for duplicate in the queue
    for job in q.jobs:
        if job.description == kwargs["description"]:
            return job

@lucasnad27
Copy link
Author

It's meant to stop us from repeating the same crawl more than once based on the description of the task. I don't really like this pattern, seems a bit hacky to stop us from having long running workers step on top of each other. But I'd like to avoid this check unless we explicitly want to check for a duplicate

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment