Skip to content

Instantly share code, notes, and snippets.

@jonascheng
Created July 2, 2018 07:39
Show Gist options
  • Save jonascheng/f240310f23388cc5ada3d906da67cdbf to your computer and use it in GitHub Desktop.
Save jonascheng/f240310f23388cc5ada3d906da67cdbf to your computer and use it in GitHub Desktop.
Celery beat health check script, you may specify this inside Dockerfile
import sys
import arrow
import shelve
import os.path
from datetime import datetime, timedelta
# Name of the file used by PersistentScheduler to store the last run times of periodic tasks.
FN_CELERYBEAT = 'celerybeat-schedule'
def is_tolerable_due(task):
now = arrow.utcnow().naive
tdelta = timedelta(minutes=-10)
print('now: {}'.format(now))
print('task: {}'.format(task))
print('task.last_run_at: {}'.format(task.last_run_at))
try:
print('task.schedule.run_every: {}'.format(task.schedule.run_every))
next_run_at = task.last_run_at + task.schedule.run_every
print('next_run_at: {}'.format(next_run_at))
remaining = next_run_at - now
print('remaining: {}'.format(remaining))
if remaining > tdelta:
# still tolerable
return True
else:
# out of tolerance
return False
except AttributeError:
remaining = task.schedule.remaining_estimate(task.last_run_at)
print('timedelta(): {}'.format(tdelta))
print('task.schedule.remaining_estimate: {}'.format(remaining))
if remaining > tdelta:
# still tolerable in 15 mins
return True
else:
return False
if __name__ == '__main__':
if os.path.isfile(FN_CELERYBEAT) is False:
# file does not exist, assume the container is not celery beat worker
print('file does not exist, assume the container is not celery beat worker')
sys.exit(0)
file_data = shelve.open(FN_CELERYBEAT)
for task_name, task in file_data['entries'].items():
is_due, next_time_to_check = task.schedule.is_due(task.last_run_at)
print('is_due: {} next_time_to_check: {}'.format(is_due, next_time_to_check))
if is_due is True:
if is_tolerable_due(task) is True:
print('the task {} may be over due, but is tolerable'.format(task_name))
else:
print('the task {} is over due'.format(task_name))
sys.exit(1)
else:
print('the task {} is not due yet'.format(task_name))
sys.exit(0)
@vsemionov
Copy link

Just what I was looking for. Thank you for sharing

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment