Created
November 25, 2020 13:23
-
-
Save kubkon/4e1b5799e13249e377ac4dd7b6765c3a to your computer and use it in GitHub Desktop.
yapapi - schedule single task at a time
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import asyncio | |
import json | |
import marshal | |
import pathlib | |
import sys | |
import yapapi | |
from yapapi.log import enable_default_logger, log_summary, log_event_repr # noqa | |
from yapapi.runner import Engine, Task, vm | |
from yapapi.runner.ctx import WorkContext | |
from datetime import timedelta | |
class EngineWrapper: | |
def __init__(self, module, package): | |
self.module = module | |
init_overhead = timedelta(minutes = 3) | |
self.engine = Engine( | |
package = package, | |
max_workers = 3, | |
budget = 10, | |
timeout = init_overhead + timedelta(minutes = 15), | |
subnet_tag = "devnet-alpha.2", | |
event_emitter = log_summary(), | |
) | |
async def submit(self, task): | |
async def worker(ctx: WorkContext, tasks): | |
ctx.send_file(self.module, "/golem/input/func") | |
async for task in tasks: | |
data = task.data | |
ctx.send_file(data["input"], "/golem/input/in") | |
ctx.run("python", "/golem/runner.py", "/golem/input/func", "/golem/input/in") | |
ctx.download_file("/golem/output/out", data["output"]) | |
yield ctx.commit() | |
task.accept_task(result=data["output"]) | |
e = await self.engine.__aenter__() | |
it = e.map(worker, [Task(data=task)]) | |
return await it.__anext__() | |
def chunks(xs, size): | |
for i in range(0, len(xs), size): | |
yield xs[i:i + size] | |
def sumit(xs): | |
return sum(xs) | |
async def main(): | |
module = "func" | |
with open(module, "wb") as f: | |
marshal.dump(sumit.__code__, f) | |
package = await vm.repo( | |
image_hash="175984b21694138aa5db74c7ef5a29b5261cfe5803c02ef50309c183", | |
min_mem_gib=0.5, | |
min_storage_gib=2.0, | |
) | |
engine = EngineWrapper(module, package) | |
futures = [] | |
for (i,chunk) in enumerate(chunks(list(range(100)), 10)): | |
input_ = f"in{i}" | |
output = f"out{i}" | |
with open(input_, "wt") as f: | |
json.dump(chunk, f) | |
futures += [engine.submit({"input": input_, "output": output})] | |
total_sum = 0 | |
for res in asyncio.as_completed(futures): | |
res = await res | |
with open(res.data["output"], "rt") as f: | |
total_sum += json.load(f) | |
print(f"{total_sum}") | |
if __name__ == "__main__": | |
enable_default_logger() | |
loop = asyncio.get_event_loop() | |
task = loop.create_task(main()) | |
try: | |
asyncio.get_event_loop().run_until_complete(task) | |
except (Exception, KeyboardInterrupt) as e: | |
print(e) | |
task.cancel() | |
asyncio.get_event_loop().run_until_complete(task) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment