Created
June 30, 2020 21:36
-
-
Save mariocesar/bd630fcebbf45ad26211513813e1924b to your computer and use it in GitHub Desktop.
Run a celery task in a django project using a command #django #celery
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import logging | |
from logging import getLogger, DEBUG | |
from celery import signature | |
from django.core.management.base import BaseCommand, CommandError | |
class Command(BaseCommand): | |
def add_arguments(self, parser): | |
parser.add_argument("task", metavar="TASK NAME") | |
parser.add_argument("kwargs", default="{}") | |
def handle(self, *args, **options): | |
from celery import current_app as app | |
name = options["task"] | |
# If you don't init the worker you will not have the full list of available tasks | |
app.loader.init_worker() | |
tasks = sorted([task for task in app.tasks if not task.startswith("celery.")]) | |
if name == "list": | |
for task in tasks: | |
print(task) | |
return | |
if name not in tasks: | |
raise CommandError(f"Task {name!r} not registered.") | |
kwargs = json.loads(options["kwargs"]) | |
task = signature(name, kwargs=kwargs) | |
print(task()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment