Skip to content

Instantly share code, notes, and snippets.

@tebeka
Last active August 29, 2015 14:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tebeka/9df9c9d9b0b41d46d335 to your computer and use it in GitHub Desktop.
Save tebeka/9df9c9d9b0b41d46d335 to your computer and use it in GitHub Desktop.
from celery import Celery, chord, chain, group
from datetime import datetime
app = Celery()
app.config_from_object({
'BROKER_URL': 'amqp://localhost',
'CELERY_RESULT_BACKEND': 'amqp://',
'CELERYD_CONCURRENCY': 10,
'CELERY_ANNOTATIONS' : {
'celery.chord_unlock': {'soft_time_limit': 10},
},
})
sensors = ['s{}'.format(i) for i in xrange(7)]
use_chord = False
fail = False
@app.task
def scp(sensor, hour):
print('scp - {} {}'.format(sensor, hour))
@app.task
def convert(sensor, hour):
if fail and (sensor == sensors[-2]):
raise ValueError('{} convert failed'.format(sensor))
print('convert - {} {}'.format(sensor, hour))
@app.task
def db_upload(sensor, hour):
print('upload - {} {}'.format(sensor, hour))
@app.task
def popluate_main(hour):
return 'cp - {}'.format(hour)
def sensor_flow(sensor, hour):
return chain(
scp.si(sensor, hour),
convert.si(sensor, hour),
db_upload.si(sensor, hour),
)
@app.task
def hourly():
hour = datetime.now()
head = group(sensor_flow(sensor, hour) for sensor in sensors)
tail = popluate_main.si(hour)
if use_chord:
ch = chord(head)(tail)
return ch
else:
ch = chain(head, tail)
return ch()
from ch import hourly
res = hourly.delay()
ch = res.get()
for child in ch.collect():
pass
if ch.successful():
print('CH: {}'.format(ch.get()))
else:
print('FAIL')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment