Skip to content

Instantly share code, notes, and snippets.

@shubham90
Created June 16, 2015 19:06
Show Gist options
  • Save shubham90/825955ed74abdd968004 to your computer and use it in GitHub Desktop.
Save shubham90/825955ed74abdd968004 to your computer and use it in GitHub Desktop.
Tick alternate implementation
def tick(self):
"""
Superclass runs a tick, that is one iteration of the scheduler. Executes
all due tasks.
This method adds a call to trim the failure watcher and updates the
last heartbeat time of the scheduler. We do not actually send a
heartbeat message since it would just get read again by this class.
:return: number of seconds before the next tick should run
:rtype: float
"""
self._failure_watcher.trim()
# this is not an event that gets sent anywhere. We process it
# immediately.
scheduler_event = {'timestamp': time.time(),
'type': 'scheduler-event',
'hostname': ("%s@%s" % (str(celery_name), platform.node()))}
worker_watcher.handle_worker_heartbeat(scheduler_event)
celerybeat_name = str(celery_name) + "@" + platform.node()
# Establish connection with DB
db = db_connection.get_database()
collection = db['celerybeat_lock']
collection.create_index('lock', unique=True)
old_timestamp = datetime.utcnow() - timedelta(seconds=200)
_logger.info('Updating the database celerybeat lock and celerybeat_name is %s ', celerybeat_name)
collection.remove({'lock_timestamp': {'$lte': old_timestamp}})
_logger.info("Removed the document more than timestamp")
try:
collection.insert({
'lock_timestamp': datetime.utcnow(),
'lock': "locked"
})
_logger.info("Insert successful with %s ", celerybeat_name)
ret = super(Scheduler, self).tick()
_logger.info("Value of ret is %s and celerybeat_name is %s ", ret, celerybeat_name)
collection.remove()
_logger.info("Delete successful")
except AssertionError:
ret = 90
_logger.info("In Assertion Error")
except Exception, err:
ret = 90
_logger.info(traceback.format_exc())
return ret
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment