Last active
February 2, 2019 11:20
-
-
Save njsmith/c7aa013ca3d15a0c98df06ce8b984c3c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import trio | |
from typing import Dict | |
class Resources: | |
def __init__(self, quantities: Dict[str, int]): | |
self.quantities = quantities | |
def __contains__(self, other): | |
for k, v in other.quantities: | |
if self.quantities[k] < v: | |
return False | |
return True | |
def __isub__(self, other): | |
assert other in self | |
for k, v in other.quantities: | |
self.quantities[k] -= v | |
def __iadd__(self, other): | |
for k, v in other.quantities: | |
self.quantities[k] += v | |
class ResourcePool: | |
def __init__(self, **available): | |
self._total = Resources(available) | |
self._available = Resources(available) | |
self._waiters = Dict[trio.hazmat.Task, Resources]() | |
async def claim(self, **wanted): | |
wanted = Resources(wanted) | |
if wanted not in self._total: | |
raise ValueError("job too large to possibly be satisfied") | |
# -- Acquire -- | |
if wanted in self._available: | |
# Claim it immediately | |
self._available -= wanted | |
else: | |
# Need to wait for someone else to hand them off to us | |
task = trio.hazmat.current_task() | |
self._waiters[task] = wanted | |
def abort_fn(_): | |
del self._waiters[task] | |
await trio.hazmat.wait_task_rescheduled(abort_fn) | |
# -- We have acquired the resources! -- | |
try: | |
yield | |
finally: | |
# -- Release -- | |
self._available += wanted | |
# Since we're greedy, a single pass suffices. If we were using a | |
# more sophisticated heuristic, we might want to put an outer loop | |
# around this. | |
for other_task, other_wanted in self._waiters.items(): | |
if other_wanted in self._available: | |
self._available -= other_wanted | |
trio.hazmat.reschedule(other_task) | |
# Usage: | |
# | |
# pool = ResourcePool(cpus=20, gpus=4) | |
# | |
# async with pool.claim(cpus=2, gpus=1): | |
# ... |
Neat!
I think you might need@asynccontextmanager
on claim()
, and a bit of logic to remove tasks from _waiters
when you reschedule them.
@oremanj oh hah, I didn't see this comment. (Github doesn't do notifications for gist comments, for some reason. Maybe you won't see this comment, I dunno!)
There's more discussion, and links to fixed up versions in python-trio/trio#896
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
See discussion starting here for context.
Note: this example uses a very unsophisticated, greedy algorithm; in particular, if you have a large quantity of small jobs arriving continuously, they can starve out large jobs. This was fine for the person who sparked the discussion, but may not be fine for you :-). Alternative scheduling policies are left as an exercise for the reader.