Last active
April 11, 2020 20:30
-
-
Save Contextualist/35ff58868d00ed8943b41685d7afcd49 to your computer and use it in GitHub Desktop.
rate-limited top level loop
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
async def main(): | |
MAXRANK = # MAXRANK | |
XYZF = # XYZ FILE | |
with open(XYZF) as f: | |
# See `ratelimit_nursery.py` below | |
async with open_nursery_with_capacity(150) as _n: # <<< NOTE: adjust number of pending frame here. | |
# The number should be as low as possible while being | |
# high enough to saturate your workers' load. | |
frange = # THE SET OF FRAME NUMBER TO BE CALCULATED | |
for i in range(1, max(frange)+1): | |
if i not in frange: | |
f = input_water(f, skip=True) | |
continue | |
acoords, atmi, nodes, ni2ai, f = input_water(f) | |
G = gen_graph(nodes) | |
cliques = list(nxx.enumerate_cliques(G, MAXRANK)) | |
# Note that you don't have to change the `one_frame` function at all. | |
await _n.start_once_acquired(one_frame, i, cliques, acoords, atmi, ni2ai) | |
run_combine(main, ...) # FILL OUT REST OF YOUR PARAMS |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""This is a utility context manager in the unreleased | |
version of Grain. When the number of child tasks under | |
it exceeds `conc`, its `start_once_acquired` will block | |
asynchronously until there are free spots. | |
""" | |
import trio | |
from async_generator import asynccontextmanager | |
import types | |
@asynccontextmanager | |
async def open_nursery_with_capacity(conc): | |
sema = trio.Semaphore(conc) | |
async def _rl_task(fn, *args, task_status=trio.TASK_STATUS_IGNORED): | |
async with sema: | |
task_status.started() | |
await fn(*args) | |
async def start_once_acquired(self, fn, *args): | |
await self.start(_rl_task, fn, *args) | |
async with trio.open_nursery() as _n: | |
_n.start_once_acquired = types.MethodType(start_once_acquired, _n) | |
yield _n |
@kumaranu
By saying FILL OUT REST OF YOUR PARAMS
I mean Grain's parameters (e.g. temporary_err=(GausBadOutput, GausRuntimeError, trio.TooSlowError), nolocal=True
).
The top-level task passed to main could either be an async function (in this case) or a list of async functions, so you need currying:
- async def main(args):
- sampledCoords, reducedCliques, reducedWeights, IAtNum, allNi2ai, logDir = args[0], args[1], args[2], args[3], args[4], args[5]
+ async def main(sampledCoords, reducedCliques, reducedWeights, IAtNum, allNi2ai, logDir):
- inputArgs = [sampledCoords, reducedCliques, reducedWeights, IAtNum, allNi2ai, logDir]
- run_combine(main, inputArgs)
+ run_combine(partial(main, sampledCoords, reducedCliques, reducedWeights, IAtNum, allNi2ai, logDir),
+ temporary_err=(GausBadOutput, GausRuntimeError, trio.TooSlowError),
+ nolocal=True,
+)
The other parts look fine.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I have the following situation with my main function takes in arguments.
I have copied ratelimit_nursery function in a file called ratelimit_nursery.py and I am importing it.
I am calling the main function in the following way: