-
-
Save 1st1/f110d5e2ade94e679c4442e9b6d117e1 to your computer and use it in GitHub Desktop.
import asyncio | |
import random | |
import time | |
async def worker(name, queue): | |
while True: | |
# Get a "work item" out of the queue. | |
sleep_for = await queue.get() | |
# Sleep for the "sleep_for" seconds. | |
await asyncio.sleep(sleep_for) | |
# Notify the queue that the "work item" has been processed. | |
queue.task_done() | |
print(f'{name} has slept for {sleep_for:.2f} seconds') | |
async def main(): | |
# Create a queue that we will use to store our "workload". | |
queue = asyncio.Queue() | |
# Generate random timings and put them into the queue. | |
total_sleep_time = 0 | |
for _ in range(20): | |
sleep_for = random.uniform(0.05, 1.0) | |
total_sleep_time += sleep_for | |
queue.put_nowait(sleep_for) | |
# Create three worker tasks to process the queue concurrently. | |
tasks = [] | |
for i in range(3): | |
task = asyncio.create_task(worker(f'worker-{i}', queue)) | |
tasks.append(task) | |
# Wait until the queue is fully processed. | |
started_at = time.monotonic() | |
await queue.join() | |
total_slept_for = time.monotonic() - started_at | |
# Cancel our worker tasks. | |
for task in tasks: | |
task.cancel() | |
print('====') | |
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds') | |
print(f'total expected sleep time: {total_sleep_time:.2f} seconds') | |
asyncio.run(main()) |
For traditional multi thread + queue, it would works like:
import threading
import queue
import random
import time
q = queue.Queue()
lock = threading.Lock()
def worker(q, index):
cnt = 0
while True:
cnt = cnt + 1
if not q.empty():
with lock:
print(f'Worker {index} consume {q.get()}, count:{cnt}')
time.sleep(random.randint(1, 20)/10)
else:
time.sleep(1)
def inqueue():
cnt = 0
while True:
cnt = cnt + 1
i = random.randint(1, 10001)
print(f"Enqueue {i}, count:{cnt}")
with lock:
q.put(i)
time.sleep(0.1)
if cnt % 100 == 0:
time.sleep(100)
def main():
t_inqueue = threading.Thread(target=inqueue, args=())
t_inqueue.start()
time.sleep(3)
for i in range(5):
t = threading.Thread(target=worker, args=(q, i))
t.start()
if __name__ == '__main__':
main()
great examples guys. this was simple and clear to understand. sadly, i don't think this is going to help speed up my networked API calls as they are just limited by network speed and the GIL, global interpreter lock, of python.
edit: so this is literally just the example from the python documentation
great examples guys. this was simple and clear to understand. sadly, i don't think this is going to help speed up my networked API calls as they are just limited by network speed and the GIL, global interpreter lock, of python.
edit: so this is literally just the example from the python documentation
It's not just the example from the python docs. It's the example for the Python documentation. Jury is one of Python's core devs, and co-author of Asyncio, and author of the async/await syntax.
@1st1 Thanks for your help.
I changed your gist slightly to make it work with threading:
However, it seems that the task
task_dequeue()
doesn't work as expected, would you help to take a look at it ?inserting queue:0.95193975849139 inserting queue:0.4126914655983662 inserting queue:0.06469289732759037 ====> ENTERING DEQUEUE inserting queue:0.6371992807835489 inserting queue:0.5150791863419759 inserting queue:0.6435704016735537 inserting queue:0.7021951747038464 inserting queue:0.10352214837552644 inserting queue:0.9473580907504813 inserting queue:0.4135447172976028 inserting queue:0.329603710993434 inserting queue:0.07332900743730483 inserting queue:0.517951602458065 inserting queue:0.050595876540664504 inserting queue:0.1502889820422032 inserting queue:0.1674049554774545 inserting queue:0.5752381577004789 inserting queue:0.39471607398543573 inserting queue:0.2616939369576392 inserting queue:0.2534322236566886 inserting queue:0.2338269996901821 inserting queue:0.41332410609141934 inserting queue:0.8967170698534741 inserting queue:0.9536775061167974 inserting queue:0.41487047235012803 inserting queue:0.8282921670779227 inserting queue:0.4875614601997763 inserting queue:0.5328441641772085 ...