Skip to content

Instantly share code, notes, and snippets.

@rizumu
Last active August 29, 2015 14:03
Show Gist options
  • Save rizumu/7f32d187fb2ef0d18a80 to your computer and use it in GitHub Desktop.
Save rizumu/7f32d187fb2ef0d18a80 to your computer and use it in GitHub Desktop.
from datetime import timedelta
from time import sleep
from celery.task import task
from django.core.management.base import BaseCommand, CommandError
from django.utils import timezone
@task
def add(x, y):
return x + y
class Command(BaseCommand):
help = "Check if celery is up and running."
def handle(self, *args, **options):
try:
result = add.apply_async(
args=[4, 4], expires=timezone.now() + timedelta(seconds=3), connect_timeout=3)
now = timezone.now()
while (now + timedelta(seconds=3)) > timezone.now():
if result.result == 8:
return "success"
sleep(0.5)
return "failure"
except Exception as e:
raise CommandError("check_celery_status: Celery Service Error", e)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment