Skip to content

Instantly share code, notes, and snippets.

@codeinthehole
Created November 21, 2012 13:46
Show Gist options
  • Star 33 You must be signed in to star a gist
  • Fork 12 You must be signed in to fork a gist
  • Save codeinthehole/4124910 to your computer and use it in GitHub Desktop.
Save codeinthehole/4124910 to your computer and use it in GitHub Desktop.
Sample Celery chain usage for processing pipeline
from celery import chain
from django.core.management.base import BaseCommand
from . import tasks
class Command(BaseCommand):
def handle(self, *args, **kwargs):
source_file = args[0]
chain(
tasks.fetch.s(source_file), # Fetch data from remote source
tasks.blacklist.s(), # Remove blacklisted records
tasks.transform.s(), # Transform raw data ready for loading
tasks.load.s(), # Load into DB
).apply_async()
import shutil
import os
from celery import task
@task()
def fetch(fixture_path):
"""
Fetch a file from a remote location
"""
destination = "/tmp/source.csv"
print "Fetching data from %s - saving to %s" % (fixture_path, destination)
shutil.copyfile(fixture_path, destination)
return destination
@task()
def blacklist(source_path):
base, ext = os.path.splitext(source_path)
destination = "%s-afterblacklist%s" % (base, ext)
print "Transforming data in %s to %s" % (source_path, destination)
shutil.copyfile(source_path, destination)
return destination
@task()
def transform(source_path):
base, ext = os.path.splitext(source_path)
destination = "%s-transformed%s" % (base, ext)
print "Transforming data in %s to %s" % (source_path, destination)
shutil.copyfile(source_path, destination)
return destination
@task()
def load(filepath):
print "Loading data in %s and removing" % filepath
os.remove(filepath)
@MrYoda
Copy link

MrYoda commented May 31, 2017

Hi! Could any task in chain run some other tasks (e.g.: I want to split raw data into chunks in transform() and process the chunks by another celery tasks in parallel manner)

@hussaintamboli
Copy link

Hi, Instead of @task, I use @shared_task. When I chain my tasks,

chain(fetch.subtask(resource_id), process.subtask())

I get an error.

TypeError: 'int' object is not iterable

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment