Skip to content

Instantly share code, notes, and snippets.

@ymorgenstern
ymorgenstern / celery_testing.py
Last active July 11, 2022 08:07
Testing complex pipelines in Celery
from time import sleep
from typing import List
from celery import Celery, chain, group, shared_task
from celery.result import AsyncResult, GroupResult
class JobStatus(str, Enum):
PENDING = "pending"
STARTED = "started"