Skip to content

Instantly share code, notes, and snippets.

@jbarham
Created February 3, 2021 08:06
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jbarham/de89f85e900bf5f3adce3f0e4d65c5d9 to your computer and use it in GitHub Desktop.
Save jbarham/de89f85e900bf5f3adce3f0e4d65c5d9 to your computer and use it in GitHub Desktop.
from django.core.management.base import BaseCommand
from rq.registry import FailedJobRegistry
from django_rq.queues import get_queue
from django_rq.utils import get_jobs
class Command(BaseCommand):
help = 'Delete failed jobs from Django RQ.'
def add_arguments(self, parser):
parser.add_argument('-q', '--queue', default='default', help='queue name (default: "default")')
parser.add_argument('-f', '--func', help='optional job function name, e.g. "app.tasks.func"')
parser.add_argument('--dry-run', action='store_true', help='Do not actually delete failed jobs')
def handle(self, *args, **options):
queue = get_queue(options['queue'])
registry = FailedJobRegistry(queue.name, queue.connection)
job_ids = registry.get_job_ids()
jobs = get_jobs(queue, job_ids, registry)
func_name = options.get('func')
if func_name:
jobs = [job for job in jobs if job.func_name == func_name]
if options['dry_run']:
print(f'Found {len(jobs)} failed jobs')
else:
for job in jobs:
queue.connection.lrem(queue.key, 0, job.id)
job.delete()
print(f'Deleted {len(jobs)} failed jobs')
@jbarham
Copy link
Author

jbarham commented Feb 3, 2021

This is a Django management command to delete failed jobs from Django RQ.

@BigglesZX
Copy link

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment