Skip to content

Instantly share code, notes, and snippets.

@turtlemonvh
Created August 8, 2014 21:14
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save turtlemonvh/371fc926320d4cd24f18 to your computer and use it in GitHub Desktop.
Save turtlemonvh/371fc926320d4cd24f18 to your computer and use it in GitHub Desktop.
Dynamically add celery tasks
## THIS IS UNTESTED
from worker.models import TaskType
from website.celery import app
import importlib
# Dynamically add registered tasks
# Celery._tasks is a task registry object
# https://github.com/celery/celery/blob/34c43244b1681a59540936748800aaa504786a35/celery/app/base.py#L162 - _tasks
# https://github.com/celery/celery/blob/524421a36dcc838a5f51e5cf122902aab774bad1/celery/app/registry.py#L22 - task registry object
# Autodiscover calls AppLoader which is a thin shell around BaseLoader
# https://github.com/celery/celery/blob/34c43244b1681a59540936748800aaa504786a35/celery/app/base.py#L341 - autodisover
# https://github.com/celery/celery/blob/34c43244b1681a59540936748800aaa504786a35/celery/loaders/base.py - base loader
# task function calls _task_from_fun, which adds tasks to registry
# https://github.com/celery/celery/blob/34c43244b1681a59540936748800aaa504786a35/celery/app/base.py#L227 - task; creates a new task
# https://github.com/celery/celery/blob/34c43244b1681a59540936748800aaa504786a35/celery/app/base.py#L269 - _task_from_fun
# task function is called as a decorator on a task function
# TL;DR
# - call app.task directly on functions you want to add
def add_tasks():
"""Grab task definitions out of database and add them to celery's registry"""
for task in TaskType.objects.all():
# Grab the reference to the function
# get function from call_string
# call string should be in the form: "module1.module2.module3.task_class"
module_path = task.call_string.split(".")
module = importlib.import_module(".%s" %(module_path[-2]),
package=".".join(module_path[0:-2]))
task_class = getattr(module, module_path[-1])
# The execute method is the callable
# Call the 'task' method to register
app.task(getattr(task_class, 'execute'))
@turtlemonvh
Copy link
Author

I ended up using a different approach of creating one task type that is registered for the whole application, then writing my own Task class to manage the particulars, but I'm saving this in case that road hits a dead end.

@tanaysoni
Copy link

@turtlemonvh could you share code snippet of the other approach that you mention?

@turtlemonvh
Copy link
Author

Sorry - this was a long time ago, and I don't have access to any of that code anymore.

The basic idea was that a single task class would be used for all tasks and all the complexity of loading different library code based on the task details would be handled by the execute method of that custom task object.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment