Skip to content

Instantly share code, notes, and snippets.

@graingert
Created Dec 24, 2020
Embed
What would you like to do?
# /opt/libreofficedev7.1/program/python test2.py
2004 177
2003 176
2002 175
2005 178
2002 181
2003 180
2004 179
2005 182
^Cunhandled exception during asyncio.run() shutdown
task: <Task finished name='__main__.amain' coro=<amain() done, defined at /opt/libreofficedev7.1/program/python-core-3.8.4/lib/python3.8/site-packages/anyio/_backends/_asyncio.py:146> exception=UnboundLocalError("local variable 'retval' referenced before assignment")>
Traceback (most recent call last):
File "/opt/libreofficedev7.1/program/python-core-3.8.4/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 155, in wrapper
return await func(*args)
File "test2.py", line 58, in amain
await tx.send(await bstalk.reserve())
File "/opt/libreofficedev7.1/program/python-core-3.8.4/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 438, in __aexit__
raise exceptions[0]
File "/opt/libreofficedev7.1/program/python-core-3.8.4/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 466, in _run_wrapped_task
await func(*args)
File "/opt/libreofficedev7.1/program/python-core-3.8.4/lib/python3.8/site-packages/anyio/_core/_threads.py", line 28, in run_sync_in_worker_thread
return await get_asynclib().run_sync_in_worker_thread(func, *args, cancellable=cancellable,
File "/opt/libreofficedev7.1/program/python-core-3.8.4/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 535, in run_sync_in_worker_thread
return cast(T_Retval, retval)
UnboundLocalError: local variable 'retval' referenced before assignment
Traceback (most recent call last):
File "test2.py", line 64, in <module>
sys.exit(main())
File "test2.py", line 61, in main
return anyio.run(amain)
File "/opt/libreofficedev7.1/program/python-core-3.8.4/lib/python3.8/site-packages/anyio/_core/_eventloop.py", line 52, in run
return asynclib.run(func, *args, **backend_options) # type: ignore
File "/opt/libreofficedev7.1/program/python-core-3.8.4/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 160, in run
return native_run(wrapper(), debug=debug)
File "/opt/libreofficedev7.1/program/python-core-3.8.4/lib/asyncio/runners.py", line 43, in run
return loop.run_until_complete(main)
File "/opt/libreofficedev7.1/program/python-core-3.8.4/lib/asyncio/base_events.py", line 603, in run_until_complete
self.run_forever()
File "/opt/libreofficedev7.1/program/python-core-3.8.4/lib/asyncio/base_events.py", line 570, in run_forever
self._run_once()
File "/opt/libreofficedev7.1/program/python-core-3.8.4/lib/asyncio/base_events.py", line 1823, in _run_once
event_list = self._selector.select(timeout)
File "/opt/libreofficedev7.1/program/python-core-3.8.4/lib/selectors.py", line 468, in select
fd_event_list = self._selector.poll(timeout, max_ev)
KeyboardInterrupt
# Pastebin MpoVlz16
import sys
import time
import aiostalk
import anyio
import attr
import contextlib
HOST = 'localhost'
PORT = 11300
ports = [ 2002, 2003, 2004, 2005 ]
@attr.s
class JobContext:
_rx = attr.ib()
_bstalk = attr.ib()
@contextlib.contextmanager
def job(self):
job = anyio.run_async_from_thread(self._rx.receive)
try:
yield job
finally:
anyio.run_async_from_thread(self._bstalk.delete, job)
@attr.s
class LockedBStalk:
_bstalk = attr.ib()
_lock = attr.ib(default=attr.Factory(anyio.create_lock))
async def reserve(self):
async with self._lock:
return await self._bstalk.reserve()
async def delete(self, job):
async with self._lock:
return await self._bstalk.delete(job)
def worker(port, jc):
while True:
with jc.job() as job:
time.sleep(1)
print(port, job.body)
async def amain():
async with aiostalk.Client((HOST, PORT), watch='unprocessed') as raw_bstalk:
bstalk = LockedBStalk(raw_bstalk)
tx, rx = anyio.create_memory_object_stream(1)
jc = JobContext(rx=rx, bstalk=bstalk)
async with anyio.create_task_group() as tg:
for port in ports:
await tg.spawn(anyio.run_sync_in_worker_thread, worker, port, jc)
while True:
await tx.send(await bstalk.reserve())
def main():
return anyio.run(amain)
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment