Skip to content

Instantly share code, notes, and snippets.

@napsternxg
Last active August 28, 2023 21:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save napsternxg/fe29bb094acc90e62d99f6d393d647af to your computer and use it in GitHub Desktop.
Save napsternxg/fe29bb094acc90e62d99f6d393d647af to your computer and use it in GitHub Desktop.
asyncio_queue_event
import asyncio
import logging
import random
import time
from dataclasses import dataclass
from typing import Any
from tqdm.auto import tqdm
logger = logging.getLogger(__name__)
logging.basicConfig(
format="%(asctime)s,%(msecs)03d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s",
datefmt="%Y-%m-%d:%H:%M:%S",
level=logging.DEBUG,
)
"""
Related video: https://www.youtube.com/watch?v=kMcwcJdIvHI
"""
@dataclass
class Item:
idx: int
value: Any
retries_left: int = 0
class AsyncQueueRunner(object):
def __init__(self, num_consumers=3, source_queue_size=10, max_retries=3):
self.pbar = None
self.source_queue = asyncio.Queue(maxsize=source_queue_size)
self.sink_queue = asyncio.Queue()
self.ready_event = asyncio.Event()
self.num_consumers = num_consumers
self.max_retries = max_retries
async def producer(self, iterator):
logger.debug("producer with iterator is ready")
self.ready_event.set()
for i, val in enumerate(iterator):
item = Item(i, val, self.max_retries)
# item = random.randint(1, 10)
await self.source_queue.put(item)
logger.debug(f"Produced item {item}")
self.pbar.total += 1
await self.source_queue.put(None)
async def consume(self, item):
time.sleep(random.random() * 0.1)
if item.value % 10 == 0:
raise Exception(f"{item} is bad")
return item
async def flush(self, item):
print(item, file=self.fp)
return item
async def consumer(self, name):
logger.debug(f"consumer {name} ready")
while True:
logger.debug(f"Consumer {name} waiting for event to clear")
await self.ready_event.wait()
logger.debug(f"Consumer {name} waiting for item")
item = await self.source_queue.get()
if item is None:
await self.sink_queue.put(item)
self.source_queue.task_done()
break
try:
await self.consume(item)
logger.debug(f"Consumer {name} consumed item {item}")
await self.sink_queue.put(item)
except Exception as e:
logger.exception(
f"Consumer {name} failed on item {item}. Exception: {e}"
)
self.ready_event.clear()
await asyncio.sleep(0.5)
self.ready_event.set()
if item.retries_left > 0:
item = Item(item.idx, item.value, item.retries_left - 1)
logger.debug(f"Retries remaining: readding {item}")
await self.source_queue.put(item)
self.source_queue.task_done()
logger.debug(f"Consumer {name} exiting.")
async def sink(self):
logger.debug("Sink is ready")
while True:
item = await self.sink_queue.get()
if item is None:
logger.debug(f"Sink got {item=}. Exiting Loop.")
break
await self.flush(item)
logger.debug(f"Sink consumed item {item}")
self.pbar.update(1)
self.sink_queue.task_done()
self.sink_queue.task_done()
logger.debug("Sink exiting.")
async def cleanup(self):
logger.debug(
f"Finished run. {self.source_queue.qsize()=}, {self.sink_queue.qsize()=}"
)
await self.source_queue.join()
logger.debug("Finished source_queue join")
await self.sink_queue.join()
logger.debug("Finished sink_queue join")
await self.sink_coroutine
logger.debug(
f"Finished sink. {self.source_queue.qsize()=}, {self.sink_queue.qsize()=}"
)
for i, c in enumerate(self.consumer_coroutines):
c.cancel()
logger.debug(
f"Finished consumer {i}. {self.source_queue.qsize()=}, {self.sink_queue.qsize()=}"
)
async def run(self, iterator):
logger.debug(
f"Starting run. {self.source_queue.qsize()=}, {self.sink_queue.qsize()=}"
)
with tqdm(total=0) as self.pbar:
self.ready_event.clear()
self.sink_coroutine = asyncio.create_task(self.sink())
self.consumer_coroutines = [
asyncio.create_task(self.consumer(i)) for i in range(self.num_consumers)
]
await self.producer(iterator)
await self.cleanup()
async def main():
runner = AsyncQueueRunner()
with open("log.txt", "w+") as runner.fp:
await runner.run(range(3))
logger.info("Running another round")
await runner.run(range(4))
if __name__ == "__main__":
asyncio.run(main())
import asyncio
import random
"""
Related video: https://www.youtube.com/watch?v=kMcwcJdIvHI
"""
async def producer(queue, event):
while True:
# await asyncio.sleep(1)
item = random.randint(1, 10)
await queue.put(item)
print(f'Produced item {item}')
async def consumer(queue, event, name):
while True:
await asyncio.sleep(1)
if event.is_set():
print(f'Consumer {name} waiting for event to clear')
await asyncio.sleep(1)
continue
else:
print(f'Consumer {name} waiting for item')
item = await queue.get()
if item == 5:
print(f'Consumer {name} failed on item {item}')
event.set()
await asyncio.sleep(1)
event.clear()
else:
print(f'Consumer {name} consumed item {item}')
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=10)
event = asyncio.Event()
consumers = [consumer(queue, event, i) for i in range(3)]
producer_coroutine = producer(queue, event)
await asyncio.gather(*consumers, producer_coroutine)
asyncio.run(main())
import asyncio
import random
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(
format="%(asctime)s,%(msecs)03d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s",
datefmt="%Y-%m-%d:%H:%M:%S",
level=logging.INFO,
)
"""
Related video: https://www.youtube.com/watch?v=kMcwcJdIvHI
"""
async def producer(queue, event):
# while True:
for i in range(10):
# await asyncio.sleep(1)
item = random.randint(1, 10)
await queue.put(item)
logger.info(f'Produced item {item}')
await queue.put(None)
async def consumer(queue, event, name, sink_queue, close_producer):
while True:
await asyncio.sleep(1)
if event.is_set():
logger.info(f'Consumer {name} waiting for event to clear')
await asyncio.sleep(1)
continue
else:
logger.info(f'Consumer {name} waiting for item')
if close_producer.is_set():
logger.info(f'Consumer {name}: close_producer is set. Exiting Loop.')
break
item = await queue.get()
if item is None:
logger.info(f'Consumer {name} got {item=}. Exiting Loop.')
await sink_queue.put(item)
close_producer.set()
logger.info(f'Consumer {name} setting close_producer.')
queue.task_done()
break
if item == 5:
logger.info(f'Consumer {name} failed on item {item}')
event.set()
await asyncio.sleep(1)
event.clear()
else:
logger.info(f'Consumer {name} consumed item {item}')
await sink_queue.put(item)
queue.task_done()
async def sink(sink_queue):
while True:
await asyncio.sleep(1)
item = await sink_queue.get()
logger.info(f'Sink consumed item {item}')
sink_queue.task_done()
if item is None:
logger.info(f'Sink got {item=}. Exiting Loop.')
break
async def main():
queue = asyncio.Queue(maxsize=10)
sink_queue = asyncio.Queue(maxsize=10)
event = asyncio.Event()
close_producer = asyncio.Event()
consumers = [consumer(queue, event, i, sink_queue, close_producer) for i in range(3)]
producer_coroutine = producer(queue, event)
sink_coroutine = sink(sink_queue)
await asyncio.gather(*consumers, producer_coroutine, sink_coroutine)
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment