Skip to content

Instantly share code, notes, and snippets.

@pangyuteng
Last active June 17, 2022 17:14
Show Gist options
  • Save pangyuteng/d8f15f53b6f6bfd36d1bc4801735e155 to your computer and use it in GitHub Desktop.
Save pangyuteng/d8f15f53b6f6bfd36d1bc4801735e155 to your computer and use it in GitHub Desktop.
async demo of workers being paused and resumed to consume jobs from queue with Celery, Rabbitmq, os Windows.
  • these files are shared to faciliate troubleshooting an issue described in the below link: https://stackoverflow.com/questions/45784824/worker-does-not-consume-tasks-after-celery-add-consumer-is-called

  • start workers celery@high1woka and celery@default1woka

    celery -A foo worker -l info -Q default -n default1woka -b amqp://guest@localhost:5672// -Ofair -c 2 --pool=eventlet

    celery -A foo worker -l info -Q high -n high1woka -b amqp://guest@localhost:5672// -Ofair -c 1 --pool=solo

  • monitor celery and rabbitmq

    flower -A foo --port=5555 --broker=amqp://guest@localhost:5672//

    http://localhost:15672/#/

  • python dostuff.py - produces tasks to queues default and high

  • python cancel.py - cancels consumption of tasks from queue high for worker celery@high1woka

  • check that celery@high1woka logged that consumption from queue high is stopped.

  • python dostuff.py - produces tasks to queues default and high

  • python resume.py - request worker celery@high1woka to resume consumption of tasks from qeueue high.

  • ISSUE occurs here. celery@high1woka is not consuming tasks from queue high.

  • if worker celery@high1woka is restarted, worker starts to consume tasks

Note: for windows OS in \Lib\site-packages\billiard\py3\connection.py, update line 32 to "except AttributeError:".

--

ABOVE issue is resolved by the following steps

  • updating celery : pip install celery==4.1.0
  • updating billiard/spawn.py : encasulate line 338 to 339 with try: except: pass
  • (optional) install eventlet: pip install eventlet==0.22.1
  • adding --pool=eventlet or --pool=solo when starting workers per comment in celery/celery#4178
#cancel consuming from queue
import traceback
from foo import app
import celery
ds = []
activate =app.control.inspect().active()
while activate is None:
activate =app.control.inspect().active()
if activate is not None:
for k,v in activate.items():
try:
if '@high' in k:
ds.append(k)
except:
traceback.print_exc()
print(activate)
print(ds)
for x in range(1):
r = app.control.cancel_consumer('high',destination=ds)
from kombu import Queue
BROKER_URL = 'amqp://guest@localhost:5672//'
CELERYD_PREFETCH_MULTIPLIER = 1
TASK_SERIALIZER = 'json'
RESULT_SERIALIZER = 'json'
ACCEPT_CONTENT = ['json']
TIMEZONE = 'US/Pacific'
ENABLE_UTC = True
CELERY_DEFAULT_QUEUE = 'default'
CELERY_QUEUES = (
Queue('default', routing_key='aok'),
Queue('high', routing_key='nokay'),
)
from celery import Celery
import time
import random
from foo import add
#for a in range(1): #<-- works
for a in range(10):
result = add.apply_async(args=(3,7),queue='default',routing_key='aok')
print('default:',result)
result = add.apply_async(args=(1,1),queue='high', routing_key='nokay')
print('high:',result)
from celery import Celery
import time
app = Celery('foo')
app.config_from_object('celeryconfig')
@app.task
def add(x, y):
time.sleep(2)
return x + y
#resume consuming from queue
from foo import app
for x in range(1):
app.control.add_consumer('high',destination=['celery@high1woka'])
@pangyuteng
Copy link
Author

pangyuteng commented Dec 30, 2018

current finding:
likely same issue discussed here:
celery/celery#4817

monkey patch? tried enabling moneky patch but then luigi tasks will not exit.
http://www.gevent.org/api/gevent.monkey.html

after some trial and error, workers seems to be running with the below set of libs.

amqp==2.2.2
billiard==3.5.0.5
celery==4.2.0
eventlet==0.23.0
gevent==1.3.7
greenlet==0.4.15
kombu==4.2.2

celery config

from kombu import Queue

CELERY_ACKS_LATE = False # in house system service requirement
CELERYD_PREFETCH_MULTIPLIER = 1 # in house system service requirement
BROKER_HEARTBEAT = None
TASK_SERIALIZER = 'json'
RESULT_SERIALIZER = 'json'
ACCEPT_CONTENT = ['json']
TIMEZONE = 'US/Pacific'
ENABLE_UTC = True
CELERY_DEFAULT_QUEUE = 'default'

old notes:

maybe useful?
https://stackoverflow.com/questions/37255548/how-to-run-celery-on-windows gevent is prefered than eventlet.
celery/celery#3453 - limit timeout -why?
https://medium.com/squad-engineering/two-years-with-celery-in-production-bug-fix-edition-22238669601d -downgrade celery
celery/celery#4849 -version below 4.2 dos not support 3.7
celery/celery#4980 -disable heart beat?
celery/celery#3649 -disable heart beat?

Seems like flower and celery inspect works, until tasks is being triggered by workers...

output from python -m celery -A *** -b amqp://****:5672 inspect active:

Error: No nodes replied within time constraint.

👍 getting good search results from above error message.
https://github.com/celery/celery/issues?utf8=%E2%9C%93&q=%22Error%3A+No+nodes+replied+within+time+constraint.%22

traceback from flower:

[W 181230 09:17:12 control:44] 'stats' inspect method failed
[W 181230 09:17:12 control:44] 'active_queues' inspect method failed
[W 181230 09:17:12 control:44] 'registered' inspect method failed
[W 181230 09:17:12 control:44] 'scheduled' inspect method failed
[W 181230 09:17:12 control:44] 'active' inspect method failed
[W 181230 09:17:12 control:44] 'reserved' inspect method failed
[W 181230 09:17:12 control:44] 'revoked' inspect method failed
[W 181230 09:17:12 control:44] 'conf' inspect method failed

traceback from a worker (unsure if this is the cause for the above unresponsiveness though...):

[2018-12-30 09:16:20,469: WARNING/MainProcess] Traceback (most recent call last):
[2018-12-30 09:16:20,469: WARNING/MainProcess] File "src/gevent/_waiter.py", line 119, in gevent.__waiter.Waiter.switch
[2018-12-30 09:16:20,469: WARNING/MainProcess] File "L:\apps\win\conda\py37\lib\site-packages\celery\worker\pidbox.py", line 120, in loop
    connection.drain_events(timeout=1.0)
[2018-12-30 09:16:20,469: WARNING/MainProcess] File "L:\apps\win\conda\py37\lib\site-packages\kombu\connection.py", line 315, in drain_events
    return self.transport.drain_events(self.connection, **kwargs)
[2018-12-30 09:16:20,469: WARNING/MainProcess] File "L:\apps\win\conda\py37\lib\site-packages\kombu\transport\pyamqp.py", line 103, in drain_events
    return connection.drain_events(**kwargs)
[2018-12-30 09:16:20,469: WARNING/MainProcess] File "L:\apps\win\conda\py37\lib\site-packages\amqp\connection.py", line 491, in drain_events
    while not self.blocking_read(timeout):
[2018-12-30 09:16:20,469: WARNING/MainProcess] File "L:\apps\win\conda\py37\lib\site-packages\amqp\connection.py", line 496, in blocking_read
    frame = self.transport.read_frame()
[2018-12-30 09:16:20,469: WARNING/MainProcess] File "L:\apps\win\conda\py37\lib\site-packages\amqp\transport.py", line 243, in read_frame
    frame_header = read(7, True)
[2018-12-30 09:16:20,469: WARNING/MainProcess] File "L:\apps\win\conda\py37\lib\site-packages\amqp\transport.py", line 418, in _read
    s = recv(n - len(rbuf))
[2018-12-30 09:16:20,469: WARNING/MainProcess] File "L:\apps\win\conda\py37\lib\site-packages\gevent\_socket3.py", line 378, in recv
    return _socket.socket.recv(self._sock, *args)
[2018-12-30 09:16:20,469: WARNING/MainProcess] ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host
[2018-12-30 09:16:20,469: WARNING/MainProcess] 2018-12-30T17:16:20Z
[2018-12-30 09:16:20,469: WARNING/MainProcess] <built-in method switch of gevent.__greenlet_primitives.TrackedRawGreenlet object at 0x0000000F2C043828> failed with ConnectionResetError

[2019-01-03 10:17:23,437: WARNING/MainProcess] Traceback (most recent call last):
[2019-01-03 10:17:23,437: WARNING/MainProcess] File "src/gevent/_waiter.py", line 119, in gevent.__waiter.Waiter.switch
[2019-01-03 10:17:23,437: WARNING/MainProcess] File "L:\apps\win\conda\py37\lib\site-packages\celery\worker\pidbox.py", line 120, in loop
    connection.drain_events(timeout=1.0)
[2019-01-03 10:17:23,437: WARNING/MainProcess] File "L:\apps\win\conda\py37\lib\site-packages\kombu\connection.py", line 315, in drain_events
    return self.transport.drain_events(self.connection, **kwargs)
[2019-01-03 10:17:23,437: WARNING/MainProcess] File "L:\apps\win\conda\py37\lib\site-packages\kombu\transport\pyamqp.py", line 103, in drain_events
    return connection.drain_events(**kwargs)
[2019-01-03 10:17:23,437: WARNING/MainProcess] File "L:\apps\win\conda\py37\lib\site-packages\amqp\connection.py", line 491, in drain_events
    while not self.blocking_read(timeout):
[2019-01-03 10:17:23,437: WARNING/MainProcess] File "L:\apps\win\conda\py37\lib\site-packages\amqp\connection.py", line 496, in blocking_read
    frame = self.transport.read_frame()
[2019-01-03 10:17:23,437: WARNING/MainProcess] File "L:\apps\win\conda\py37\lib\site-packages\amqp\transport.py", line 243, in read_frame
    frame_header = read(7, True)
[2019-01-03 10:17:23,437: WARNING/MainProcess] File "L:\apps\win\conda\py37\lib\site-packages\amqp\transport.py", line 418, in _read
    s = recv(n - len(rbuf))
[2019-01-03 10:17:23,437: WARNING/MainProcess] File "L:\apps\win\conda\py37\lib\site-packages\gevent\_socket3.py", line 378, in recv
    return _socket.socket.recv(self._sock, *args)
[2019-01-03 10:17:23,437: WARNING/MainProcess] ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host
[2019-01-03 10:17:23,437: WARNING/MainProcess] 2019-01-03T18:17:23Z
[2019-01-03 10:17:23,437: WARNING/MainProcess] <built-in method switch of gevent.__greenlet_primitives.TrackedRawGreenlet object at 0x000000C2257778D0> failed with ConnectionResetError
[2019-01-03 10:17:24,014: INFO/MainProcess] ***************REDACTED********
[2019-01-03 10:19:23,480: WARNING/MainProcess] L:\apps\win\conda\py37\lib\site-packages\celery\concurrency\gevent.py:49: UserWarning: libuv only supports millisecond timer resolution; all times less will be set to 1 ms
  g = self._Greenlet.spawn_later(secs, entry)

@pangyuteng
Copy link
Author

pangyuteng commented Jun 17, 2022

specify workername using docker container id

https://stackoverflow.com/questions/20995351/how-can-i-get-docker-linux-container-information-from-within-the-container-itsel

export workername=$HOSTNAME

export workername=$(openssl rand -hex 6)

export docker_full_id=$(basename $(cat /proc/1/cpuset))

export workername=${docker_full_id:0:12}

echo -n $workername > /opt/workername.txt

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment