Instantly share code, notes, and snippets.

Embed
What would you like to do?
Part of the blog post - Timeouts and Retries which will walk through how to combine timeouts and retries in Celery - http://adikrishnan.in/2018/09/05/timeouts-and-retries/
from tasks import hello, timeout_test, retry_timeout_test, max_retries_test, max_retries_test_2
hello.delay()
timeout_test.delay()
retry_timeout_test.delay()
max_retries_test.delay()
max_retries_test_2.delay()
import time
import shlex
import random
import datetime
import subprocess
from celery import Celery, chord
from celery.exceptions import SoftTimeLimitExceeded, MaxRetriesExceededError
conf = {
"broker_url": "redis://localhost:6379/7",
"result_backend": "redis://localhost:6379/7",
"result_expires": "600",
"task_soft_time_limit": "30",
}
celery_app = Celery()
celery_app.config_from_object(conf)
@celery_app.task
def hello():
print("Bello Merld at {}".format(datetime.datetime.now().isoformat()))
@celery_app.task
def timeout_test():
try:
print("Gonna start a long task!")
subprocess.check_output(shlex.split("ping www.adikrishnan.in"))
except SoftTimeLimitExceeded:
print("Clean up now!")
# Default retry delay is 180 seconds
@celery_app.task(bind=True, default_retry_delay=30)
def retry_timeout_test(self):
try:
print("Gonna start a long task!")
n = random.randint(1, 10)
print("n is {}".format(n))
if n % 2 == 0:
print("Running subprocess")
subprocess.check_output(shlex.split("ping www.adikrishnan.in"))
else:
print("Raising an Error")
raise KeyError("Testing Key Errors")
except SoftTimeLimitExceeded:
print("Clean up now!")
except KeyError as ex:
print("Retrying now")
raise self.retry()
@celery_app.task(bind=True, default_retry_delay=30, retry_kwargs={'max_retries': 5})
def max_retries_test(self):
try:
print("Gonna start a long task!")
print("Raising an Error")
raise KeyError("Testing Key Errors")
except KeyError as ex:
print("Retrying now")
raise self.retry(exc=ex)
except MaxRetriesExceededError:
print("Max retries exceeded.")
@celery_app.task(bind=True, default_retry_delay=30, retry_kwargs={'max_retries': 5})
def max_retries_test_2(self):
try:
print("Gonna start a long task!")
print("Raising an Error")
raise KeyError("Testing Key Errors")
except KeyError:
print("Retrying now")
# Not sending the exception alongwith retry
raise self.retry()
except MaxRetriesExceededError:
print("Max retries exceeded.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment