Skip to content

Instantly share code, notes, and snippets.

@codeinthehole
Created November 19, 2012 11:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save codeinthehole/4110231 to your computer and use it in GitHub Desktop.
Save codeinthehole/4110231 to your computer and use it in GitHub Desktop.
Task structure for a job-queue-based processing pipeline
from celery import celery
@celery.task
def fetch():
"""
Download files from FTP server
This tasks could be trigger by a cronjob to start the processing pipeline.
When file(s) downloaded successfully, create a set of partner_prepare jobs
to process each file in chunks.
"""
@celery.task
def partner_prepare(filepath, offset, count):
"""
Process a chunk of a file, converting from the partner's raw format to CSV
format ready for loading into the partner tables.
This could be several separate tasks for things like data cleaning,
ignoring some records, converting dates to database format etc.
** I'm not sure how to structure this so extra steps can easily by added.
When done pass the CSV filepath to the partner_load task
"""
@celery.task
def partner_load(filepath):
"""
Load the passed CSV into the partner tables
When done:
- delete the file
- create a partner_transform task to convert the partner data into the
core project records passing the set of IDs (ISBNS, partner SKUs)
** Not sure on the best way of passing the data between tasks here
"""
@celery.task
def partner_process(ids):
"""
Process the records in the partner table, creating/updating the appropriate
product/stockrecord models
When done:
- trigger a deploy task
- trigger an index task
"""
@celery.task
def deploy_products(ids):
"""
Deploy products from process to app
"""
@celery.task
def index_products(ids):
"""
Update search index for passed products
"""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment