Skip to content

Instantly share code, notes, and snippets.

@vodik
Last active July 30, 2018 18:48
Show Gist options
  • Save vodik/8c4ad19e835e2c282bca7ae3d0a29309 to your computer and use it in GitHub Desktop.
Save vodik/8c4ad19e835e2c282bca7ae3d0a29309 to your computer and use it in GitHub Desktop.
New observable
import asyncio
import collections
from collections.abc import AsyncGenerator
class Subject(AsyncGenerator):
def __init__(self, *, loop=None):
if loop is None:
self._loop = asyncio.get_event_loop()
else:
self._loop = loop
self._push = self._loop.create_future()
self._pull = self._loop.create_future()
self._awaiters = []
self._busy = False
async def asend(self, value):
await self._serialize_access()
self._push.set_result(value)
await self._wait_for_pull()
async def athrow(self, typ, val=None, tb=None):
await self._serialize_access()
self._push.set_exception(val or typ())
await self._wait_for_pull()
async def aclose(self):
await self.athrow(StopAsyncIteration)
async def _wait_for_pull(self):
await self._pull
self._pull = self._loop.create_future()
self._busy = False
async def _serialize_access(self):
while self._busy:
future = self._loop.create_future()
self._awaiters.append(future)
await future
self._awaiters.remove(future)
self._busy = True
async def __aiter__(self):
while True:
try:
yield await self._push
except StopAsyncIteration:
return
finally:
self._push = self._loop.create_future()
self._pull.set_result(True)
for awaiter in self._awaiters[:1]:
awaiter.set_result(True)
async def __aenter__(self):
return self
async def __aexit__(self, typ, val, tb):
if not typ:
await self.aclose()
else:
await self.athrow(typ, val, tb)
async def main():
agen = Subject()
async def read_msgs(subject):
try:
async for msg in subject:
print(f"Received: {msg}")
await asyncio.sleep(2)
except RuntimeError as err:
print(f"Source has crashed: {err}")
async def send_msg(subject, value):
print(f"Sending value: {value}")
await subject.asend(value)
async def send_msgs(subject):
async with subject:
await send_msg(subject, 1)
await send_msg(subject, 2)
await send_msg(subject, 3)
raise RuntimeError("Example athrow of exception")
await asyncio.gather(read_msgs(agen), send_msgs(agen))
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment