from gevent import spawn
from gevent import monkey
monkey.patch_all()
from master import celery_app
from celery.result import AsyncResult
TASK_UUID = "50d870f2-9f91-41ee-8966-f7c34102b5f3"
def fetch(uuid):
res = AsyncResult(uuid, app=celery_app).get()
print res
spawn(fetch, TASK_UUID).join()
A bit filtered output from: python -m trace --trace test.py | grep 'redis.py\|test.py\|async.py\|greenlet.py'
redis.py(142): self.result_consumer = self.ResultConsumer(
redis.py(143): self, self.app, self.accept,
redis.py(144): self._pending_results, self._pending_messages,
redis.py(48): super(ResultConsumer, self).__init__(*args, **kwargs)
async.py(211): self.backend = backend
async.py(212): self.app = app
async.py(213): self.accept = accept
async.py(214): self._pending_results = pending_results
async.py(215): self._pending_messages = pending_messages
async.py(216): self.on_message = None
async.py(217): self.buckets = WeakKeyDictionary()
async.py(218): self.drainer = drainers[detect_environment()](self)
async.py(74): super(greenletDrainer, self).__init__(*args, **kwargs)
async.py(40): self.result_consumer = result_consumer
async.py(75): self._started = threading.Event()
async.py(76): self._stopped = threading.Event()
async.py(77): self._shutdown = threading.Event()
redis.py(49): self._get_key_for_task = self.backend.get_key_for_task
redis.py(50): self._decode_result = self.backend.decode_result
redis.py(51): self.subscribed_to = set()
async.py(152): if start_drainer:
async.py(153): self.result_consumer.drainer.start()
async.py(89): if not self._started.is_set():
async.py(90): self._g = self.spawn(self.run)
async.py(117): from gevent import spawn
async.py(118): return spawn
#### GREENLET SWITCH #####
async.py(91): self._started.wait()
#### GREENLET SWITCH #####
async.py(80): self._started.set()
async.py(81): while not self._stopped.is_set():
async.py(82): try:
async.py(83): self.result_consumer.drain_events(timeout=1)
redis.py(69): m = self._pubsub.get_message(timeout=timeout)
async.py(84): except socket.timeout:
##### CRASH! #####
async.py(154): try:
async.py(155): self._maybe_resolve_from_buffer(result)
async.py(161): result._maybe_set_cache(self._pending_messages.take(result.id))
async.py(156): except Empty:
async.py(157): self._add_pending_result(result.id, result, weak=weak)
async.py(164): concrete, weak_ = self._pending_results
async.py(165): if task_id not in weak_ and result.id not in concrete:
async.py(166): (weak_ if weak else concrete)[task_id] = result
async.py(167): self.result_consumer.consume_from(task_id)
redis.py(74): if self._pubsub is None:
redis.py(75): return self.start(task_id)
# _pubsub is initialized later, next calls are working
redis.py(54): self._pubsub = self.backend.client.pubsub(