Created
December 24, 2020 14:42
-
-
Save graingert/221b5181432dd96da919bfecb8a2e762 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# /opt/libreofficedev7.1/program/python test2.py | |
2004 177 | |
2003 176 | |
2002 175 | |
2005 178 | |
2002 181 | |
2003 180 | |
2004 179 | |
2005 182 | |
^Cunhandled exception during asyncio.run() shutdown | |
task: <Task finished name='__main__.amain' coro=<amain() done, defined at /opt/libreofficedev7.1/program/python-core-3.8.4/lib/python3.8/site-packages/anyio/_backends/_asyncio.py:146> exception=UnboundLocalError("local variable 'retval' referenced before assignment")> | |
Traceback (most recent call last): | |
File "/opt/libreofficedev7.1/program/python-core-3.8.4/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 155, in wrapper | |
return await func(*args) | |
File "test2.py", line 58, in amain | |
await tx.send(await bstalk.reserve()) | |
File "/opt/libreofficedev7.1/program/python-core-3.8.4/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 438, in __aexit__ | |
raise exceptions[0] | |
File "/opt/libreofficedev7.1/program/python-core-3.8.4/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 466, in _run_wrapped_task | |
await func(*args) | |
File "/opt/libreofficedev7.1/program/python-core-3.8.4/lib/python3.8/site-packages/anyio/_core/_threads.py", line 28, in run_sync_in_worker_thread | |
return await get_asynclib().run_sync_in_worker_thread(func, *args, cancellable=cancellable, | |
File "/opt/libreofficedev7.1/program/python-core-3.8.4/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 535, in run_sync_in_worker_thread | |
return cast(T_Retval, retval) | |
UnboundLocalError: local variable 'retval' referenced before assignment | |
Traceback (most recent call last): | |
File "test2.py", line 64, in <module> | |
sys.exit(main()) | |
File "test2.py", line 61, in main | |
return anyio.run(amain) | |
File "/opt/libreofficedev7.1/program/python-core-3.8.4/lib/python3.8/site-packages/anyio/_core/_eventloop.py", line 52, in run | |
return asynclib.run(func, *args, **backend_options) # type: ignore | |
File "/opt/libreofficedev7.1/program/python-core-3.8.4/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 160, in run | |
return native_run(wrapper(), debug=debug) | |
File "/opt/libreofficedev7.1/program/python-core-3.8.4/lib/asyncio/runners.py", line 43, in run | |
return loop.run_until_complete(main) | |
File "/opt/libreofficedev7.1/program/python-core-3.8.4/lib/asyncio/base_events.py", line 603, in run_until_complete | |
self.run_forever() | |
File "/opt/libreofficedev7.1/program/python-core-3.8.4/lib/asyncio/base_events.py", line 570, in run_forever | |
self._run_once() | |
File "/opt/libreofficedev7.1/program/python-core-3.8.4/lib/asyncio/base_events.py", line 1823, in _run_once | |
event_list = self._selector.select(timeout) | |
File "/opt/libreofficedev7.1/program/python-core-3.8.4/lib/selectors.py", line 468, in select | |
fd_event_list = self._selector.poll(timeout, max_ev) | |
KeyboardInterrupt |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Pastebin MpoVlz16 | |
import sys | |
import time | |
import aiostalk | |
import anyio | |
import attr | |
import contextlib | |
HOST = 'localhost' | |
PORT = 11300 | |
ports = [ 2002, 2003, 2004, 2005 ] | |
@attr.s | |
class JobContext: | |
_rx = attr.ib() | |
_bstalk = attr.ib() | |
@contextlib.contextmanager | |
def job(self): | |
job = anyio.run_async_from_thread(self._rx.receive) | |
try: | |
yield job | |
finally: | |
anyio.run_async_from_thread(self._bstalk.delete, job) | |
@attr.s | |
class LockedBStalk: | |
_bstalk = attr.ib() | |
_lock = attr.ib(default=attr.Factory(anyio.create_lock)) | |
async def reserve(self): | |
async with self._lock: | |
return await self._bstalk.reserve() | |
async def delete(self, job): | |
async with self._lock: | |
return await self._bstalk.delete(job) | |
def worker(port, jc): | |
while True: | |
with jc.job() as job: | |
time.sleep(1) | |
print(port, job.body) | |
async def amain(): | |
async with aiostalk.Client((HOST, PORT), watch='unprocessed') as raw_bstalk: | |
bstalk = LockedBStalk(raw_bstalk) | |
tx, rx = anyio.create_memory_object_stream(1) | |
jc = JobContext(rx=rx, bstalk=bstalk) | |
async with anyio.create_task_group() as tg: | |
for port in ports: | |
await tg.spawn(anyio.run_sync_in_worker_thread, worker, port, jc) | |
while True: | |
await tx.send(await bstalk.reserve()) | |
def main(): | |
return anyio.run(amain) | |
if __name__ == "__main__": | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment