Skip to content

Instantly share code, notes, and snippets.

@pangyuteng
Last active June 17, 2022 17:14
Show Gist options
  • Save pangyuteng/d8f15f53b6f6bfd36d1bc4801735e155 to your computer and use it in GitHub Desktop.
Save pangyuteng/d8f15f53b6f6bfd36d1bc4801735e155 to your computer and use it in GitHub Desktop.
async demo of workers being paused and resumed to consume jobs from queue with Celery, Rabbitmq, os Windows.
  • these files are shared to faciliate troubleshooting an issue described in the below link: https://stackoverflow.com/questions/45784824/worker-does-not-consume-tasks-after-celery-add-consumer-is-called

  • start workers celery@high1woka and celery@default1woka

    celery -A foo worker -l info -Q default -n default1woka -b amqp://guest@localhost:5672// -Ofair -c 2 --pool=eventlet

    celery -A foo worker -l info -Q high -n high1woka -b amqp://guest@localhost:5672// -Ofair -c 1 --pool=solo

  • monitor celery and rabbitmq

    flower -A foo --port=5555 --broker=amqp://guest@localhost:5672//

    http://localhost:15672/#/

  • python dostuff.py - produces tasks to queues default and high

  • python cancel.py - cancels consumption of tasks from queue high for worker celery@high1woka

  • check that celery@high1woka logged that consumption from queue high is stopped.

  • python dostuff.py - produces tasks to queues default and high

  • python resume.py - request worker celery@high1woka to resume consumption of tasks from qeueue high.

  • ISSUE occurs here. celery@high1woka is not consuming tasks from queue high.

  • if worker celery@high1woka is restarted, worker starts to consume tasks

Note: for windows OS in \Lib\site-packages\billiard\py3\connection.py, update line 32 to "except AttributeError:".

--

ABOVE issue is resolved by the following steps

  • updating celery : pip install celery==4.1.0
  • updating billiard/spawn.py : encasulate line 338 to 339 with try: except: pass
  • (optional) install eventlet: pip install eventlet==0.22.1
  • adding --pool=eventlet or --pool=solo when starting workers per comment in celery/celery#4178
#cancel consuming from queue
import traceback
from foo import app
import celery
ds = []
activate =app.control.inspect().active()
while activate is None:
activate =app.control.inspect().active()
if activate is not None:
for k,v in activate.items():
try:
if '@high' in k:
ds.append(k)
except:
traceback.print_exc()
print(activate)
print(ds)
for x in range(1):
r = app.control.cancel_consumer('high',destination=ds)
from kombu import Queue
BROKER_URL = 'amqp://guest@localhost:5672//'
CELERYD_PREFETCH_MULTIPLIER = 1
TASK_SERIALIZER = 'json'
RESULT_SERIALIZER = 'json'
ACCEPT_CONTENT = ['json']
TIMEZONE = 'US/Pacific'
ENABLE_UTC = True
CELERY_DEFAULT_QUEUE = 'default'
CELERY_QUEUES = (
Queue('default', routing_key='aok'),
Queue('high', routing_key='nokay'),
)
from celery import Celery
import time
import random
from foo import add
#for a in range(1): #<-- works
for a in range(10):
result = add.apply_async(args=(3,7),queue='default',routing_key='aok')
print('default:',result)
result = add.apply_async(args=(1,1),queue='high', routing_key='nokay')
print('high:',result)
from celery import Celery
import time
app = Celery('foo')
app.config_from_object('celeryconfig')
@app.task
def add(x, y):
time.sleep(2)
return x + y
#resume consuming from queue
from foo import app
for x in range(1):
app.control.add_consumer('high',destination=['celery@high1woka'])
@pangyuteng
Copy link
Author

pangyuteng commented Jun 17, 2022

specify workername using docker container id

https://stackoverflow.com/questions/20995351/how-can-i-get-docker-linux-container-information-from-within-the-container-itsel

export workername=$HOSTNAME

export workername=$(openssl rand -hex 6)

export docker_full_id=$(basename $(cat /proc/1/cpuset))

export workername=${docker_full_id:0:12}

echo -n $workername > /opt/workername.txt

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment