Skip to content

Instantly share code, notes, and snippets.

@Contextualist
Last active April 11, 2020 20:30
Show Gist options
  • Save Contextualist/35ff58868d00ed8943b41685d7afcd49 to your computer and use it in GitHub Desktop.
Save Contextualist/35ff58868d00ed8943b41685d7afcd49 to your computer and use it in GitHub Desktop.
rate-limited top level loop
async def main():
MAXRANK = # MAXRANK
XYZF = # XYZ FILE
with open(XYZF) as f:
# See `ratelimit_nursery.py` below
async with open_nursery_with_capacity(150) as _n: # <<< NOTE: adjust number of pending frame here.
# The number should be as low as possible while being
# high enough to saturate your workers' load.
frange = # THE SET OF FRAME NUMBER TO BE CALCULATED
for i in range(1, max(frange)+1):
if i not in frange:
f = input_water(f, skip=True)
continue
acoords, atmi, nodes, ni2ai, f = input_water(f)
G = gen_graph(nodes)
cliques = list(nxx.enumerate_cliques(G, MAXRANK))
# Note that you don't have to change the `one_frame` function at all.
await _n.start_once_acquired(one_frame, i, cliques, acoords, atmi, ni2ai)
run_combine(main, ...) # FILL OUT REST OF YOUR PARAMS
"""This is a utility context manager in the unreleased
version of Grain. When the number of child tasks under
it exceeds `conc`, its `start_once_acquired` will block
asynchronously until there are free spots.
"""
import trio
from async_generator import asynccontextmanager
import types
@asynccontextmanager
async def open_nursery_with_capacity(conc):
sema = trio.Semaphore(conc)
async def _rl_task(fn, *args, task_status=trio.TASK_STATUS_IGNORED):
async with sema:
task_status.started()
await fn(*args)
async def start_once_acquired(self, fn, *args):
await self.start(_rl_task, fn, *args)
async with trio.open_nursery() as _n:
_n.start_once_acquired = types.MethodType(start_once_acquired, _n)
yield _n
@kumaranu
Copy link

kumaranu commented Apr 11, 2020

I have the following situation with my main function takes in arguments.

I have copied ratelimit_nursery function in a file called ratelimit_nursery.py and I am importing it.

@asynccontextmanager
async def open_nursery_with_capacity(conc):
    sema = trio.Semaphore(conc)
    async def _rl_task(fn, *args, task_status=trio.TASK_STATUS_IGNORED):
        async with sema:
            task_status.started()
            await fn(*args)
    async def start_once_acquired(self, fn, *args):
        await self.start(_rl_task, fn, *args)

    async with trio.open_nursery() as _n:
        _n.start_once_acquired = types.MethodType(start_once_acquired, _n)
        yield _n
import ratelimit_nursery
async def main(args):
    sampledCoords, reducedCliques, reducedWeights, IAtNum, allNi2ai, logDir = args[0], args[1], args[2], args[3], args[4], args[5]
    frames = []
    async with ratelimit_nursery.open_nursery_with_capacity(150) as _n: # <<< NOTE: adjust number of pending frame here
        for i in range(len(sampledCoords)):
            for j in range(3, 4): #Here I am only looping over one of the topology
                await _n.start_once_acquired(one_frameFragOnly, i + 1, j, reducedCliques[j], reducedWeights[j], sampledCoords[i], IAtNum, allNi2ai[j], logDir)

I am calling the main function in the following way:

directPdtInts, sampledCoords = getSampledCoords(rsigma, dimInfo, R_Centre_RatioD, GrdSpr, nSampsList, Coords, RQMGrd, IQMPntr)
topoCentroids, allNi2ai, reducedCliques, reducedWeights = topologyCentroids.topoCentroidFunction1(IQMPntr, IAtNum, sampledCoords, inputsDir, directPdtInts)
layers, layerCentroids, layerConnections = hypergraph.getHyperGraph(topoCentroids, nLayers)
inputArgs = [sampledCoords, reducedCliques, reducedWeights, IAtNum, allNi2ai, logDir]
run_combine(main, inputArgs)

@Contextualist
Copy link
Author

Contextualist commented Apr 11, 2020

@kumaranu
By saying FILL OUT REST OF YOUR PARAMS I mean Grain's parameters (e.g. temporary_err=(GausBadOutput, GausRuntimeError, trio.TooSlowError), nolocal=True).

The top-level task passed to main could either be an async function (in this case) or a list of async functions, so you need currying:

- async def main(args):
-     sampledCoords, reducedCliques, reducedWeights, IAtNum, allNi2ai, logDir = args[0], args[1], args[2], args[3], args[4], args[5]
+ async def main(sampledCoords, reducedCliques, reducedWeights, IAtNum, allNi2ai, logDir):

- inputArgs = [sampledCoords, reducedCliques, reducedWeights, IAtNum, allNi2ai, logDir]
- run_combine(main, inputArgs)
+ run_combine(partial(main, sampledCoords, reducedCliques, reducedWeights, IAtNum, allNi2ai, logDir),
+     temporary_err=(GausBadOutput, GausRuntimeError, trio.TooSlowError),
+     nolocal=True,
+)

The other parts look fine.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment