Skip to content

Instantly share code, notes, and snippets.

@jedie
Created September 5, 2018 07:32
Show Gist options
  • Save jedie/22e82115239f6ddd1a6e50477e3d0657 to your computer and use it in GitHub Desktop.
Save jedie/22e82115239f6ddd1a6e50477e3d0657 to your computer and use it in GitHub Desktop.
============================= test session starts ==============================
platform linux -- Python 3.6.3, pytest-3.7.4, py-1.5.3, pluggy-0.7.1 -- /home/jens/repos/PyLucid-env/bin/python
cachedir: .pytest_cache
Django settings: django_tools_test_project.test_settings (from ini file)
rootdir: /home/jens/repos/PyLucid-env/src/django-tools, inifile: pytest.ini
plugins: django-3.4.2, cov-2.6.0, celery-4.2.1
collecting ... collected 1 item
run-last-failure: rerun previous 1 failure first
django_tools_tests/test_unittest_celery.py::test_rpc_on_message
*** on_message: {'task_id': '395435dd-81e1-4e81-9134-e1cd8e07f036', 'status': 'FOO', 'result': {'bar': 1}, 'traceback': None, 'children': []}
FAILED
=================================== FAILURES ===================================
_____________________________ test_rpc_on_message ______________________________
self = <celery.backends.rpc.ResultConsumer object at 0x7fb2b7fd2cc0>
result = <AsyncResult: 395435dd-81e1-4e81-9134-e1cd8e07f036>, timeout = 1
on_interval = <promise@0x7fb2b7fe4558>
on_message = <function test_rpc_on_message.<locals>.on_raw_message at 0x7fb2d2aa0b70>
kwargs = {}, prev_on_m = None, _ = None
def _wait_for_pending(self, result,
timeout=None, on_interval=None, on_message=None,
**kwargs):
self.on_wait_for_pending(result, timeout=timeout, **kwargs)
prev_on_m, self.on_message = self.on_message, on_message
try:
for _ in self.drain_events_until(
result.on_ready, timeout=timeout,
> on_interval=on_interval):
_ = None
kwargs = {}
on_interval = <promise@0x7fb2b7fe4558>
on_message = <function test_rpc_on_message.<locals>.on_raw_message at 0x7fb2d2aa0b70>
prev_on_m = None
result = <AsyncResult: 395435dd-81e1-4e81-9134-e1cd8e07f036>
self = <celery.backends.rpc.ResultConsumer object at 0x7fb2b7fd2cc0>
timeout = 1
../../lib/python3.6/site-packages/celery/backends/async.py:255:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <celery.backends.async.Drainer object at 0x7fb2b7fd2f98>
p = <promise@0x7fb2b7ff9e58 --> <bound method AsyncResult._on_fulfilled of <AsyncResult: 395435dd-81e1-4e81-9134-e1cd8e07f036>>>
timeout = 1, on_interval = <promise@0x7fb2b7fe4558>
wait = <bound method ResultConsumer.drain_events of <celery.backends.rpc.ResultConsumer object at 0x7fb2b7fd2cc0>>
def drain_events_until(self, p, timeout=None, on_interval=None, wait=None):
wait = wait or self.result_consumer.drain_events
time_start = monotonic()
while 1:
# Total time spent may exceed a single call to wait()
if timeout and monotonic() - time_start >= timeout:
> raise socket.timeout()
E socket.timeout
on_interval = <promise@0x7fb2b7fe4558>
p = <promise@0x7fb2b7ff9e58 --> <bound method AsyncResult._on_fulfilled of <AsyncResult: 395435dd-81e1-4e81-9134-e1cd8e07f036>>>
self = <celery.backends.async.Drainer object at 0x7fb2b7fd2f98>
time_start = 6028.351486148
timeout = 1
wait = <bound method ResultConsumer.drain_events of <celery.backends.rpc.ResultConsumer object at 0x7fb2b7fd2cc0>>
../../lib/python3.6/site-packages/celery/backends/async.py:54: timeout
During handling of the above exception, another exception occurred:
celery_worker = <Worker: gen17297@Intel4790K (running)>
@pytest.mark.celery(
result_backend="rpc",
broker_url="memory://",
)
def test_rpc_on_message(celery_worker):
def on_raw_message(msg_dict):
print("\n *** on_message:", msg_dict)
r = test_task.apply_async()
result = r.get(
timeout=1, # <<<--- Will works if == None !
on_message=on_raw_message,
> propagate=False
)
celery_worker = <Worker: gen17297@Intel4790K (running)>
on_raw_message = <function test_rpc_on_message.<locals>.on_raw_message at 0x7fb2d2aa0b70>
r = <AsyncResult: 395435dd-81e1-4e81-9134-e1cd8e07f036>
django_tools_tests/test_unittest_celery.py:71:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../lib/python3.6/site-packages/celery/result.py:224: in get
on_message=on_message,
../../lib/python3.6/site-packages/celery/backends/async.py:188: in wait_for_pending
for _ in self._wait_for_pending(result, **kwargs):
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <celery.backends.rpc.ResultConsumer object at 0x7fb2b7fd2cc0>
result = <AsyncResult: 395435dd-81e1-4e81-9134-e1cd8e07f036>, timeout = 1
on_interval = <promise@0x7fb2b7fe4558>
on_message = <function test_rpc_on_message.<locals>.on_raw_message at 0x7fb2d2aa0b70>
kwargs = {}, prev_on_m = None, _ = None
def _wait_for_pending(self, result,
timeout=None, on_interval=None, on_message=None,
**kwargs):
self.on_wait_for_pending(result, timeout=timeout, **kwargs)
prev_on_m, self.on_message = self.on_message, on_message
try:
for _ in self.drain_events_until(
result.on_ready, timeout=timeout,
on_interval=on_interval):
yield
sleep(0)
except socket.timeout:
> raise TimeoutError('The operation timed out.')
E celery.exceptions.TimeoutError: The operation timed out.
_ = None
kwargs = {}
on_interval = <promise@0x7fb2b7fe4558>
on_message = <function test_rpc_on_message.<locals>.on_raw_message at 0x7fb2d2aa0b70>
prev_on_m = None
result = <AsyncResult: 395435dd-81e1-4e81-9134-e1cd8e07f036>
self = <celery.backends.rpc.ResultConsumer object at 0x7fb2b7fd2cc0>
timeout = 1
../../lib/python3.6/site-packages/celery/backends/async.py:259: TimeoutError
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment