Created
April 24, 2018 17:28
-
-
Save sorcio/040a93eaa43e932ba35d8a24c0e19fdb to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
END_OF_QUEUE = object() | |
class QueueTerminated(Exception): pass | |
class ObjectSender: | |
def __init__(self, queue): | |
self.queue = queue | |
self.senders = 0 | |
async def __aenter__(self): | |
self.senders += 1 | |
return self # could be a different class | |
async def __aexit__(self, blabla): | |
self.senders -= 1 | |
if self.senders == 0: | |
await self.queue.put(END_OF_QUEUE) | |
async def send(self, x): | |
await self.queue.put(x) | |
async def feed(self, aiter): | |
async with self as sender: | |
async for x in aiter: | |
await sender.send(x) | |
class ObjectReceiver: | |
def __init__(self, queue): | |
self.queue = queue | |
self.terminated = False | |
async def receive(self): | |
if self.terminated: | |
raise QueueTerminated | |
value = await self.queue.get() | |
if value is END_OF_QUEUE: | |
self.terminated = True | |
raise QueueTerminated | |
return value | |
def __aiter__(self): | |
return self | |
async def __anext__(self): | |
try: | |
return await self.receive() | |
except QueueTerminated: | |
raise StopAsyncIteration |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment