Skip to content

Instantly share code, notes, and snippets.

@zulrang
Last active December 28, 2023 21:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zulrang/0f2de61e045ce493d5dcdd58370ca86a to your computer and use it in GitHub Desktop.
Save zulrang/0f2de61e045ce493d5dcdd58370ca86a to your computer and use it in GitHub Desktop.
Simulating Goroutine and Channel functionality in Python
import asyncio
from typing import Any, AsyncGenerator, Coroutine, Generator, Never
class Channel:
def __init__(self, maxsize: int = 0):
self._queue: asyncio.Queue = asyncio.Queue(maxsize=maxsize)
async def push(self, item):
return await self._queue.put(item)
async def _async_gen(self):
yield await self._queue.get()
self._queue.task_done()
def __aiter__(self) -> AsyncGenerator[Any, None]:
return self._async_gen()
def go(coroutine: Generator[Any, None, Never] | Coroutine[Any, Any, Never]):
loop = asyncio.get_event_loop()
loop.create_task(coroutine)
async def producer(c: Channel):
i = 0
while True:
await asyncio.sleep(0.5) # Simulating some work
await c.push(f'Item {i}')
print(f'Item {i} added to queue')
i += 1
async def consumer(c: Channel, id: int):
async for item in c:
print(f'Processing {item} in consumer {id}')
await asyncio.sleep(3)
async def main():
c = Channel(maxsize=10)
# Creating producer and consumer coroutines
go(producer(c))
go(consumer(c, 1))
go(consumer(c, 2))
go(consumer(c, 3))
# Wait for all tasks to be processed
await asyncio.Future()
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment