Skip to content

Instantly share code, notes, and snippets.

@garyelephant
Last active August 29, 2015 14:11
Show Gist options
  • Save garyelephant/b49a95b6368bcd2e8d44 to your computer and use it in GitHub Desktop.
Save garyelephant/b49a95b6368bcd2e8d44 to your computer and use it in GitHub Desktop.
This scheduler is created to solve beat single point of failure problem by replacing celery default scheduler.Lock is implemented by Redis.
class SharedScheduler(Scheduler):
def __init__(self, *args, **kwargs):
Scheduler.__init__(self, *args, **kwargs)
self.merge_inplace(self.app.conf.CELERYBEAT_SCHEDULE)
# a redis server
self.redis = service.use_redis()
self.protocal = pickle
def tick(self):
"""Run a tick, that is one iteration of the scheduler.
Executes all due tasks.
"""
remaining_times = []
try:
for entry_name, entry in self.schedule.iteritems():
# https://github.com/andymccurdy/redis-py
pipe = self.redis.pipeline()
try:
key = 'celery:scheduler:%s' % entry_name
pipe.watch(key)
# after WATCHing, the pipeline is put into immediate execution
# mode until we tell it to start buffering commands again.
# this allows us to get the current value of our sequence
entry_value = pipe.get(key)
if entry_value:
try:
entry = self.protocal.loads(entry_value)
except:
error('Load entry from db failed. %s' % entry_name)
is_due, next_time_to_run = entry.is_due()
if next_time_to_run:
remaining_times.append(next_time_to_run)
if is_due:
info('Scheduler: Sending due task %s (%s)', entry_name, entry.task)
old_entry = entry
entry = old_entry.next()
entry_value = self.protocal.dumps(entry)
# now we can put the pipeline back into buffered mode with MULTI
pipe.multi()
pipe.set(key, entry_value)
# and finally, execute the pipeline (the set command)
pipe.execute()
# if a WatchError wasn't raised during execution, everything
# we just did happened atomically.
try:
result = self.apply_async(old_entry, publisher=self.publisher)
result_id = result.id
except Exception, exc:
error('Message Error: %s\n%s',
exc, traceback.format_stack(), exc_info=True)
else:
debug('%s sent. id->%s', entry.task, result_id)
except self.redis.WatchError:
# another client must have changed 'OUR-SEQUENCE-KEY' between
# the time we started WATCHing it and the pipeline's execution.
# our best bet is to just retry.
continue
except Exception:
error('Task Scheduler Failed. %s', entry_name, exc_info=True)
finally:
pipe.reset()
except RuntimeError:
pass
next_time = min(remaining_times + [self.max_interval])
info('wait for %s', next_time)
return next_time
def reserve(self, entry):
new_entry = entry.next()
return new_entry
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment