Skip to content

Instantly share code, notes, and snippets.

@smurfix
Created November 4, 2019 16:44
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save smurfix/c8efac838e6b39bedc744a6ff8ca4405 to your computer and use it in GitHub Desktop.
Save smurfix/c8efac838e6b39bedc744a6ff8ca4405 to your computer and use it in GitHub Desktop.
import trio
import random
from collections import deque
WORKER_COUNT = 10
tasks = range(103)
class WorkQueue:
def __init__(self, workers):
self.q = deque()
self.waiting = []
self.workers = workers
self.done = trio.Event()
def __aiter__(self):
return self
async def __anext__(self):
while not self.q:
w = trio.Event()
self.waiting.append(w)
if len(self.waiting) == self.workers:
self.done.set()
await w.wait()
return self.q.popleft()
def send(self, data):
if not self.q and self.waiting:
self.waiting.pop().set()
self.q.append(data)
async def wait(self):
await self.done.wait()
results = []
async def worker(worker_id, wq, results):
async def process(task, wq):
await trio.sleep(random.uniform(0, 0.1))
if random.random()<0.12: # should stop eventually
wq.send(42)
return task
async for data in wq:
result = await process(data, wq)
results.append((result, worker_id))
async def batch_job(tasks, results):
wq = WorkQueue(WORKER_COUNT)
send_chan, receive_chan = trio.open_memory_channel(float("inf"))
async with trio.open_nursery() as nursery:
for data in tasks:
wq.send(data)
for worker_id in range(WORKER_COUNT):
nursery.start_soon(worker, worker_id, wq, results)
await wq.wait()
nursery.cancel_scope.cancel()
trio.run(batch_job, tasks, results)
print(len(results),"Jobs.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment