Skip to content

Instantly share code, notes, and snippets.

@SirEdvin
Created December 14, 2021 16:32
Show Gist options
  • Save SirEdvin/4226d2673666b908736dc97924a93dd2 to your computer and use it in GitHub Desktop.
Save SirEdvin/4226d2673666b908736dc97924a93dd2 to your computer and use it in GitHub Desktop.
import aiochan as ac
import asyncio
import random
async def producer(c):
i = 0
while True:
await asyncio.sleep(random.random()) # producing stuff takes time
i += 1
await c.put('product ' + str(i))
class Buffer:
def __init__(self, in_c, buf_c):
self.buffer = []
self.in_c = in_c
self.buf_c = buf_c
self.timeout_c = ac.timeout(10)
async def buffer_collector(self):
self.buffer = []
while True:
el = await self.in_c.get()
self.buffer.append(el)
if len(self.buffer) >= 10:
self.buf_c.put(self.buffer)
self.buffer = []
async def consumer(self):
while True:
product, chan = await ac.select(self.buf_c, ac.timeout(1.5))
if chan is self.buf_c:
print('obtained:', product)
else:
current_buffer = self.buffer
self.buffer = []
print('obtained:', current_buffer)
def run(self):
ac.go(self.consumer())
ac.go(self.buffer_collector())
async def main():
el_chan = ac.Chan()
buffer_chan = ac.Chan()
buffer = Buffer(el_chan, buffer_chan)
ac.go(producer(el_chan))
buffer.run()
await asyncio.sleep(0.6)
print('It is late, let us call it a day.')
await main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment