Skip to content

Instantly share code, notes, and snippets.

@psrok1
Last active June 23, 2017 11:00
Show Gist options
  • Save psrok1/8dc27d3cdf367573183fc3f1e5524293 to your computer and use it in GitHub Desktop.
Save psrok1/8dc27d3cdf367573183fc3f1e5524293 to your computer and use it in GitHub Desktop.
Celery ResultConsumer race condition trace

Test code

from gevent import spawn
from gevent import monkey

monkey.patch_all()

from master import celery_app
from celery.result import AsyncResult

TASK_UUID = "50d870f2-9f91-41ee-8966-f7c34102b5f3"

def fetch(uuid):
    res = AsyncResult(uuid, app=celery_app).get()
    print res

spawn(fetch, TASK_UUID).join()

A bit filtered output from: python -m trace --trace test.py | grep 'redis.py\|test.py\|async.py\|greenlet.py'

redis.py(142):         self.result_consumer = self.ResultConsumer(
redis.py(143):             self, self.app, self.accept,
redis.py(144):             self._pending_results, self._pending_messages,
redis.py(48):         super(ResultConsumer, self).__init__(*args, **kwargs)
async.py(211):         self.backend = backend
async.py(212):         self.app = app
async.py(213):         self.accept = accept
async.py(214):         self._pending_results = pending_results
async.py(215):         self._pending_messages = pending_messages
async.py(216):         self.on_message = None
async.py(217):         self.buckets = WeakKeyDictionary()
async.py(218):         self.drainer = drainers[detect_environment()](self)
async.py(74):         super(greenletDrainer, self).__init__(*args, **kwargs)
async.py(40):         self.result_consumer = result_consumer
async.py(75):         self._started = threading.Event()
async.py(76):         self._stopped = threading.Event()
async.py(77):         self._shutdown = threading.Event()
redis.py(49):         self._get_key_for_task = self.backend.get_key_for_task
redis.py(50):         self._decode_result = self.backend.decode_result
redis.py(51):         self.subscribed_to = set()
async.py(152):         if start_drainer:
async.py(153):             self.result_consumer.drainer.start()
async.py(89):         if not self._started.is_set():
async.py(90):             self._g = self.spawn(self.run)
async.py(117):         from gevent import spawn
async.py(118):         return spawn

#### GREENLET SWITCH #####

async.py(91):             self._started.wait()

#### GREENLET SWITCH #####

async.py(80):         self._started.set()
async.py(81):         while not self._stopped.is_set():
async.py(82):             try:
async.py(83):                 self.result_consumer.drain_events(timeout=1)
redis.py(69):         m = self._pubsub.get_message(timeout=timeout)
async.py(84):             except socket.timeout:

##### CRASH! #####

async.py(154):         try:
async.py(155):             self._maybe_resolve_from_buffer(result)
async.py(161):         result._maybe_set_cache(self._pending_messages.take(result.id))
async.py(156):         except Empty:
async.py(157):             self._add_pending_result(result.id, result, weak=weak)
async.py(164):         concrete, weak_ = self._pending_results
async.py(165):         if task_id not in weak_ and result.id not in concrete:
async.py(166):             (weak_ if weak else concrete)[task_id] = result
async.py(167):             self.result_consumer.consume_from(task_id)
redis.py(74):         if self._pubsub is None:
redis.py(75):             return self.start(task_id)

# _pubsub is initialized later, next calls are working

redis.py(54):         self._pubsub = self.backend.client.pubsub(
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment