Skip to content

Instantly share code, notes, and snippets.

@certator
Last active August 10, 2021 17:17
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save certator/52feba8a87264f1b722fd1c327f95acb to your computer and use it in GitHub Desktop.
Save certator/52feba8a87264f1b722fd1c327f95acb to your computer and use it in GitHub Desktop.
celery ETA+gevent bug
from celery import Celery
from datetime import datetime, timedelta
import fire
app = Celery("app", broker="redis://debug-redis/")
@app.task
def dummy():
print("dummy executed")
class Cli(object):
def submit_eta_task(self):
async_result = dummy.apply_async(eta=datetime.utcnow() + timedelta(minutes=2))
print(async_result.id)
if __name__ == "__main__":
fire.Fire(Cli)
FROM python:3.7.8-alpine3.12
RUN apk add --no-cache python3-dev libffi-dev gcc musl-dev make git
#RUN pip install redis gevent fire
#RUN pip install celery==4.4.6
#RUN pip install git+https://github.com/celery/celery.git@455e0a0e86679eaaba9f0da533066627b1d79296#egg=celery
WORKDIR /app
COPY requirements.txt /app/requirements.txt
RUN pip install -r requirements.txt
import docker, os, time
def run_worker(client, network, redis_cont, name):
worker = client.containers.create(
"celery-bug-img",
"celery -A app worker -P gevent -l info",
name=name,
volumes={os.getcwd(): {"bind": "/app", "mode": "rw"}},
working_dir="/app",
)
network.connect(worker, links={redis_cont.id: "redis"})
worker.start()
return worker
def run():
print("creating docker containers")
client = docker.from_env()
client.images.build(path=".", tag="celery-bug-img")
network = client.networks.create("network1", driver="bridge",)
redis_cont = client.containers.run(
"redis", name="debug-redis", detach=True, remove=True
)
network.connect(redis_cont)
worker_1 = run_worker(client, network, redis_cont, name="debug-worker-1")
task_id = worker_1.exec_run("python -m app submit_eta_task")
print(f"submitted ETA task with id {str(task_id.output)}")
print("sleeping 10s")
time.sleep(10.0)
print(" --- debug-worker-1 log (ETA task is received) --- ")
print(worker_1.logs().decode("ascii"))
print("starting second worker")
worker_2 = run_worker(client, network, redis_cont, name="debug-worker-2")
print("sleeping 10s")
time.sleep(10.0)
print(" --- workers logs (ETA task and its duplicate are received) --- ")
print(worker_1.logs().decode("ascii"))
print(worker_2.logs().decode("ascii"))
# however two tasks received
print("sleeping 120s")
time.sleep(120.0)
print(" --- workers logs (task and its duplicate executed) --- ")
print(worker_1.logs().decode("ascii"))
print(worker_2.logs().decode("ascii"))
# in the end, app.dummy executed twice
worker_1.kill()
worker_2.kill()
redis_cont.kill()
worker_1.remove()
worker_2.remove()
network.remove()
if __name__ == "__main__":
run()
amqp==2.6.0
billiard==3.6.3.0
celery @ git+https://github.com/celery/celery.git@455e0a0e86679eaaba9f0da533066627b1d79296
fire==0.3.1
future==0.18.2
gevent==20.6.2
greenlet==0.4.16
importlib-metadata==1.7.0
kombu==4.6.11
pytz==2020.1
redis==3.5.3
six==1.15.0
termcolor==1.1.0
vine==1.3.0
zipp==3.1.0
zope.event==4.4
zope.interface==5.1.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment