Skip to content

Instantly share code, notes, and snippets.

@pangyuteng
Last active June 17, 2022 17:14
Show Gist options
  • Save pangyuteng/d8f15f53b6f6bfd36d1bc4801735e155 to your computer and use it in GitHub Desktop.
Save pangyuteng/d8f15f53b6f6bfd36d1bc4801735e155 to your computer and use it in GitHub Desktop.
async demo of workers being paused and resumed to consume jobs from queue with Celery, Rabbitmq, os Windows.
  • these files are shared to faciliate troubleshooting an issue described in the below link: https://stackoverflow.com/questions/45784824/worker-does-not-consume-tasks-after-celery-add-consumer-is-called

  • start workers celery@high1woka and celery@default1woka

    celery -A foo worker -l info -Q default -n default1woka -b amqp://guest@localhost:5672// -Ofair -c 2 --pool=eventlet

    celery -A foo worker -l info -Q high -n high1woka -b amqp://guest@localhost:5672// -Ofair -c 1 --pool=solo

  • monitor celery and rabbitmq

    flower -A foo --port=5555 --broker=amqp://guest@localhost:5672//

    http://localhost:15672/#/

  • python dostuff.py - produces tasks to queues default and high

  • python cancel.py - cancels consumption of tasks from queue high for worker celery@high1woka

  • check that celery@high1woka logged that consumption from queue high is stopped.

  • python dostuff.py - produces tasks to queues default and high

  • python resume.py - request worker celery@high1woka to resume consumption of tasks from qeueue high.

  • ISSUE occurs here. celery@high1woka is not consuming tasks from queue high.

  • if worker celery@high1woka is restarted, worker starts to consume tasks

Note: for windows OS in \Lib\site-packages\billiard\py3\connection.py, update line 32 to "except AttributeError:".

--

ABOVE issue is resolved by the following steps

  • updating celery : pip install celery==4.1.0
  • updating billiard/spawn.py : encasulate line 338 to 339 with try: except: pass
  • (optional) install eventlet: pip install eventlet==0.22.1
  • adding --pool=eventlet or --pool=solo when starting workers per comment in celery/celery#4178
#cancel consuming from queue
import traceback
from foo import app
import celery
ds = []
activate =app.control.inspect().active()
while activate is None:
activate =app.control.inspect().active()
if activate is not None:
for k,v in activate.items():
try:
if '@high' in k:
ds.append(k)
except:
traceback.print_exc()
print(activate)
print(ds)
for x in range(1):
r = app.control.cancel_consumer('high',destination=ds)
from kombu import Queue
BROKER_URL = 'amqp://guest@localhost:5672//'
CELERYD_PREFETCH_MULTIPLIER = 1
TASK_SERIALIZER = 'json'
RESULT_SERIALIZER = 'json'
ACCEPT_CONTENT = ['json']
TIMEZONE = 'US/Pacific'
ENABLE_UTC = True
CELERY_DEFAULT_QUEUE = 'default'
CELERY_QUEUES = (
Queue('default', routing_key='aok'),
Queue('high', routing_key='nokay'),
)
from celery import Celery
import time
import random
from foo import add
#for a in range(1): #<-- works
for a in range(10):
result = add.apply_async(args=(3,7),queue='default',routing_key='aok')
print('default:',result)
result = add.apply_async(args=(1,1),queue='high', routing_key='nokay')
print('high:',result)
from celery import Celery
import time
app = Celery('foo')
app.config_from_object('celeryconfig')
@app.task
def add(x, y):
time.sleep(2)
return x + y
#resume consuming from queue
from foo import app
for x in range(1):
app.control.add_consumer('high',destination=['celery@high1woka'])
@pangyuteng
Copy link
Author

pangyuteng commented Dec 30, 2018

celery==4.2.1
billiard==3.5.0.5
kombu==4.2.1
eventlet==0.23.0
gevent==1.3.7

Scripts work with no change to installed libs using above set of lib/versions. However in production, while flower reflects actual state of workers, clicking worker name in flower does not return worker states (equivalent to celery ... inspect active) after some time.
similar issue stated here: mher/flower#395

@pangyuteng
Copy link
Author

pangyuteng commented Dec 30, 2018

current finding:
likely same issue discussed here:
celery/celery#4817

monkey patch? tried enabling moneky patch but then luigi tasks will not exit.
http://www.gevent.org/api/gevent.monkey.html

after some trial and error, workers seems to be running with the below set of libs.

amqp==2.2.2
billiard==3.5.0.5
celery==4.2.0
eventlet==0.23.0
gevent==1.3.7
greenlet==0.4.15
kombu==4.2.2

celery config

from kombu import Queue

CELERY_ACKS_LATE = False # in house system service requirement
CELERYD_PREFETCH_MULTIPLIER = 1 # in house system service requirement
BROKER_HEARTBEAT = None
TASK_SERIALIZER = 'json'
RESULT_SERIALIZER = 'json'
ACCEPT_CONTENT = ['json']
TIMEZONE = 'US/Pacific'
ENABLE_UTC = True
CELERY_DEFAULT_QUEUE = 'default'

old notes:

maybe useful?
https://stackoverflow.com/questions/37255548/how-to-run-celery-on-windows gevent is prefered than eventlet.
celery/celery#3453 - limit timeout -why?
https://medium.com/squad-engineering/two-years-with-celery-in-production-bug-fix-edition-22238669601d -downgrade celery
celery/celery#4849 -version below 4.2 dos not support 3.7
celery/celery#4980 -disable heart beat?
celery/celery#3649 -disable heart beat?

Seems like flower and celery inspect works, until tasks is being triggered by workers...

output from python -m celery -A *** -b amqp://****:5672 inspect active:

Error: No nodes replied within time constraint.

👍 getting good search results from above error message.
https://github.com/celery/celery/issues?utf8=%E2%9C%93&q=%22Error%3A+No+nodes+replied+within+time+constraint.%22

traceback from flower:

[W 181230 09:17:12 control:44] 'stats' inspect method failed
[W 181230 09:17:12 control:44] 'active_queues' inspect method failed
[W 181230 09:17:12 control:44] 'registered' inspect method failed
[W 181230 09:17:12 control:44] 'scheduled' inspect method failed
[W 181230 09:17:12 control:44] 'active' inspect method failed
[W 181230 09:17:12 control:44] 'reserved' inspect method failed
[W 181230 09:17:12 control:44] 'revoked' inspect method failed
[W 181230 09:17:12 control:44] 'conf' inspect method failed

traceback from a worker (unsure if this is the cause for the above unresponsiveness though...):

[2018-12-30 09:16:20,469: WARNING/MainProcess] Traceback (most recent call last):
[2018-12-30 09:16:20,469: WARNING/MainProcess] File "src/gevent/_waiter.py", line 119, in gevent.__waiter.Waiter.switch
[2018-12-30 09:16:20,469: WARNING/MainProcess] File "L:\apps\win\conda\py37\lib\site-packages\celery\worker\pidbox.py", line 120, in loop
    connection.drain_events(timeout=1.0)
[2018-12-30 09:16:20,469: WARNING/MainProcess] File "L:\apps\win\conda\py37\lib\site-packages\kombu\connection.py", line 315, in drain_events
    return self.transport.drain_events(self.connection, **kwargs)
[2018-12-30 09:16:20,469: WARNING/MainProcess] File "L:\apps\win\conda\py37\lib\site-packages\kombu\transport\pyamqp.py", line 103, in drain_events
    return connection.drain_events(**kwargs)
[2018-12-30 09:16:20,469: WARNING/MainProcess] File "L:\apps\win\conda\py37\lib\site-packages\amqp\connection.py", line 491, in drain_events
    while not self.blocking_read(timeout):
[2018-12-30 09:16:20,469: WARNING/MainProcess] File "L:\apps\win\conda\py37\lib\site-packages\amqp\connection.py", line 496, in blocking_read
    frame = self.transport.read_frame()
[2018-12-30 09:16:20,469: WARNING/MainProcess] File "L:\apps\win\conda\py37\lib\site-packages\amqp\transport.py", line 243, in read_frame
    frame_header = read(7, True)
[2018-12-30 09:16:20,469: WARNING/MainProcess] File "L:\apps\win\conda\py37\lib\site-packages\amqp\transport.py", line 418, in _read
    s = recv(n - len(rbuf))
[2018-12-30 09:16:20,469: WARNING/MainProcess] File "L:\apps\win\conda\py37\lib\site-packages\gevent\_socket3.py", line 378, in recv
    return _socket.socket.recv(self._sock, *args)
[2018-12-30 09:16:20,469: WARNING/MainProcess] ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host
[2018-12-30 09:16:20,469: WARNING/MainProcess] 2018-12-30T17:16:20Z
[2018-12-30 09:16:20,469: WARNING/MainProcess] <built-in method switch of gevent.__greenlet_primitives.TrackedRawGreenlet object at 0x0000000F2C043828> failed with ConnectionResetError

[2019-01-03 10:17:23,437: WARNING/MainProcess] Traceback (most recent call last):
[2019-01-03 10:17:23,437: WARNING/MainProcess] File "src/gevent/_waiter.py", line 119, in gevent.__waiter.Waiter.switch
[2019-01-03 10:17:23,437: WARNING/MainProcess] File "L:\apps\win\conda\py37\lib\site-packages\celery\worker\pidbox.py", line 120, in loop
    connection.drain_events(timeout=1.0)
[2019-01-03 10:17:23,437: WARNING/MainProcess] File "L:\apps\win\conda\py37\lib\site-packages\kombu\connection.py", line 315, in drain_events
    return self.transport.drain_events(self.connection, **kwargs)
[2019-01-03 10:17:23,437: WARNING/MainProcess] File "L:\apps\win\conda\py37\lib\site-packages\kombu\transport\pyamqp.py", line 103, in drain_events
    return connection.drain_events(**kwargs)
[2019-01-03 10:17:23,437: WARNING/MainProcess] File "L:\apps\win\conda\py37\lib\site-packages\amqp\connection.py", line 491, in drain_events
    while not self.blocking_read(timeout):
[2019-01-03 10:17:23,437: WARNING/MainProcess] File "L:\apps\win\conda\py37\lib\site-packages\amqp\connection.py", line 496, in blocking_read
    frame = self.transport.read_frame()
[2019-01-03 10:17:23,437: WARNING/MainProcess] File "L:\apps\win\conda\py37\lib\site-packages\amqp\transport.py", line 243, in read_frame
    frame_header = read(7, True)
[2019-01-03 10:17:23,437: WARNING/MainProcess] File "L:\apps\win\conda\py37\lib\site-packages\amqp\transport.py", line 418, in _read
    s = recv(n - len(rbuf))
[2019-01-03 10:17:23,437: WARNING/MainProcess] File "L:\apps\win\conda\py37\lib\site-packages\gevent\_socket3.py", line 378, in recv
    return _socket.socket.recv(self._sock, *args)
[2019-01-03 10:17:23,437: WARNING/MainProcess] ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host
[2019-01-03 10:17:23,437: WARNING/MainProcess] 2019-01-03T18:17:23Z
[2019-01-03 10:17:23,437: WARNING/MainProcess] <built-in method switch of gevent.__greenlet_primitives.TrackedRawGreenlet object at 0x000000C2257778D0> failed with ConnectionResetError
[2019-01-03 10:17:24,014: INFO/MainProcess] ***************REDACTED********
[2019-01-03 10:19:23,480: WARNING/MainProcess] L:\apps\win\conda\py37\lib\site-packages\celery\concurrency\gevent.py:49: UserWarning: libuv only supports millisecond timer resolution; all times less will be set to 1 ms
  g = self._Greenlet.spawn_later(secs, entry)

@pangyuteng
Copy link
Author

pangyuteng commented Jun 17, 2022

specify workername using docker container id

https://stackoverflow.com/questions/20995351/how-can-i-get-docker-linux-container-information-from-within-the-container-itsel

export workername=$HOSTNAME

export workername=$(openssl rand -hex 6)

export docker_full_id=$(basename $(cat /proc/1/cpuset))

export workername=${docker_full_id:0:12}

echo -n $workername > /opt/workername.txt

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment