Skip to content

Instantly share code, notes, and snippets.

@njsmith
Last active February 2, 2019 11:20
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save njsmith/c7aa013ca3d15a0c98df06ce8b984c3c to your computer and use it in GitHub Desktop.
Save njsmith/c7aa013ca3d15a0c98df06ce8b984c3c to your computer and use it in GitHub Desktop.
import trio
from typing import Dict
class Resources:
def __init__(self, quantities: Dict[str, int]):
self.quantities = quantities
def __contains__(self, other):
for k, v in other.quantities:
if self.quantities[k] < v:
return False
return True
def __isub__(self, other):
assert other in self
for k, v in other.quantities:
self.quantities[k] -= v
def __iadd__(self, other):
for k, v in other.quantities:
self.quantities[k] += v
class ResourcePool:
def __init__(self, **available):
self._total = Resources(available)
self._available = Resources(available)
self._waiters = Dict[trio.hazmat.Task, Resources]()
async def claim(self, **wanted):
wanted = Resources(wanted)
if wanted not in self._total:
raise ValueError("job too large to possibly be satisfied")
# -- Acquire --
if wanted in self._available:
# Claim it immediately
self._available -= wanted
else:
# Need to wait for someone else to hand them off to us
task = trio.hazmat.current_task()
self._waiters[task] = wanted
def abort_fn(_):
del self._waiters[task]
await trio.hazmat.wait_task_rescheduled(abort_fn)
# -- We have acquired the resources! --
try:
yield
finally:
# -- Release --
self._available += wanted
# Since we're greedy, a single pass suffices. If we were using a
# more sophisticated heuristic, we might want to put an outer loop
# around this.
for other_task, other_wanted in self._waiters.items():
if other_wanted in self._available:
self._available -= other_wanted
trio.hazmat.reschedule(other_task)
# Usage:
#
# pool = ResourcePool(cpus=20, gpus=4)
#
# async with pool.claim(cpus=2, gpus=1):
# ...
@njsmith
Copy link
Author

njsmith commented Jan 30, 2019

See discussion starting here for context.

Note: this example uses a very unsophisticated, greedy algorithm; in particular, if you have a large quantity of small jobs arriving continuously, they can starve out large jobs. This was fine for the person who sparked the discussion, but may not be fine for you :-). Alternative scheduling policies are left as an exercise for the reader.

@oremanj
Copy link

oremanj commented Jan 30, 2019

Neat!

I think you might need@asynccontextmanager on claim(), and a bit of logic to remove tasks from _waiters when you reschedule them.

@njsmith
Copy link
Author

njsmith commented Feb 2, 2019

@oremanj oh hah, I didn't see this comment. (Github doesn't do notifications for gist comments, for some reason. Maybe you won't see this comment, I dunno!)

There's more discussion, and links to fixed up versions in python-trio/trio#896

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment