Skip to content

Instantly share code, notes, and snippets.

@mariocesar
Created June 30, 2020 21:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mariocesar/bd630fcebbf45ad26211513813e1924b to your computer and use it in GitHub Desktop.
Save mariocesar/bd630fcebbf45ad26211513813e1924b to your computer and use it in GitHub Desktop.
Run a celery task in a django project using a command #django #celery
import json
import logging
from logging import getLogger, DEBUG
from celery import signature
from django.core.management.base import BaseCommand, CommandError
class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument("task", metavar="TASK NAME")
parser.add_argument("kwargs", default="{}")
def handle(self, *args, **options):
from celery import current_app as app
name = options["task"]
# If you don't init the worker you will not have the full list of available tasks
app.loader.init_worker()
tasks = sorted([task for task in app.tasks if not task.startswith("celery.")])
if name == "list":
for task in tasks:
print(task)
return
if name not in tasks:
raise CommandError(f"Task {name!r} not registered.")
kwargs = json.loads(options["kwargs"])
task = signature(name, kwargs=kwargs)
print(task())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment