Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@EpicWink
Created March 28, 2023 08:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save EpicWink/2eb68def04c1f3aaaeb828e3a5aeaebd to your computer and use it in GitHub Desktop.
Save EpicWink/2eb68def04c1f3aaaeb828e3a5aeaebd to your computer and use it in GitHub Desktop.
Python queue shutdown test
"""Testing Python queue shutdown.
See https://github.com/python/cpython/issues/96471
"""
import time
import queue
import threading
import traceback
import asyncio.queues
import multiprocessing.queues
# # Threading
# q_cls = queue.Queue
# thread_cls = threading.Thread
# Multiprocessing
q_cls = multiprocessing.Queue
thread_cls = multiprocessing.Process
# # Async
# q_cls = asyncio.queues.Queue
# thread_cls = asyncio.run
# Configuration
n_consumers = 3
n_items = 25
producer_delay = 0.1
consumer_delay = 0.4
# shutdown_timeout = 4.0 # completes
shutdown_timeout = 2.0 # ends early
shutdown_immediate = True
def producer(q):
for i in range(n_items):
q.put(i)
time.sleep(producer_delay)
for _ in range(n_consumers):
q.put(None)
def consumer(q):
while (item := q.get()) is not None:
print(item)
time.sleep(consumer_delay)
async def aproducer(q):
for i in range(n_items):
await q.put(i)
await asyncio.sleep(producer_delay)
for _ in range(n_consumers):
await q.put(None)
async def aconsumer(q):
while (item := await q.get()) is not None:
print(item)
await asyncio.sleep(consumer_delay)
def run_sync():
q = q_cls(maxsize=n_consumers)
producer_thread = thread_cls(target=producer, args=(q,))
producer_thread.start()
consumer_threads = [
thread_cls(target=consumer, args=(q,))
for _ in range(n_consumers)
]
for thread in consumer_threads:
thread.start()
time.sleep(shutdown_timeout)
q.shutdown(immediate=shutdown_immediate)
producer_thread.join()
for thread in consumer_threads:
thread.join()
async def run_async():
q = q_cls(maxsize=n_consumers)
consumer_task = asyncio.create_task(aproducer(q))
producer_tasks = [
asyncio.create_task(aconsumer(q))
for _ in range(n_consumers)
]
await asyncio.sleep(shutdown_timeout)
q.shutdown(immediate=shutdown_immediate)
try:
await consumer_task
except asyncio.queues.QueueShutDown:
traceback.print_exc()
for task in producer_tasks:
try:
await task
except asyncio.queues.QueueShutDown:
traceback.print_exc()
def main():
if thread_cls is asyncio.run:
asyncio.run(run_async())
else:
run_sync()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment