Skip to content

Instantly share code, notes, and snippets.

@sorcio
Created April 24, 2018 17:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sorcio/040a93eaa43e932ba35d8a24c0e19fdb to your computer and use it in GitHub Desktop.
Save sorcio/040a93eaa43e932ba35d8a24c0e19fdb to your computer and use it in GitHub Desktop.
END_OF_QUEUE = object()
class QueueTerminated(Exception): pass
class ObjectSender:
def __init__(self, queue):
self.queue = queue
self.senders = 0
async def __aenter__(self):
self.senders += 1
return self # could be a different class
async def __aexit__(self, blabla):
self.senders -= 1
if self.senders == 0:
await self.queue.put(END_OF_QUEUE)
async def send(self, x):
await self.queue.put(x)
async def feed(self, aiter):
async with self as sender:
async for x in aiter:
await sender.send(x)
class ObjectReceiver:
def __init__(self, queue):
self.queue = queue
self.terminated = False
async def receive(self):
if self.terminated:
raise QueueTerminated
value = await self.queue.get()
if value is END_OF_QUEUE:
self.terminated = True
raise QueueTerminated
return value
def __aiter__(self):
return self
async def __anext__(self):
try:
return await self.receive()
except QueueTerminated:
raise StopAsyncIteration
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment