Created
October 18, 2020 23:28
-
-
Save Tinche/5c399d4d0c5d52e3f733443f4b05a010 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from queue import Empty | |
from time import sleep | |
import janus | |
should_finish = False | |
def process_queue_items(queue): | |
print("Worker started") | |
while not should_finish: | |
try: | |
item = queue.get(timeout=1.0) | |
except Empty: | |
# We want to loop here to check the flag. | |
continue | |
if item is None: | |
return | |
print(f"Processing {item}") | |
sleep(2.0) | |
print("Finished gracefully") | |
async def main(): | |
global should_finish | |
queue = janus.Queue() | |
f = asyncio.get_event_loop().run_in_executor( | |
None, process_queue_items, queue.sync_q | |
) | |
try: | |
for i in range(10): | |
await queue.async_q.put(i) | |
await asyncio.sleep(1) | |
await queue.async_q.put(None) | |
await f | |
except asyncio.CancelledError: | |
pass | |
print("Shutting down the worker") | |
should_finish = True | |
await f | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment