Skip to content

Instantly share code, notes, and snippets.

@kubkon
Created Nov 25, 2020
Embed
What would you like to do?
yapapi - schedule single task at a time
#!/usr/bin/env python3
import asyncio
import json
import marshal
import pathlib
import sys
import yapapi
from yapapi.log import enable_default_logger, log_summary, log_event_repr # noqa
from yapapi.runner import Engine, Task, vm
from yapapi.runner.ctx import WorkContext
from datetime import timedelta
class EngineWrapper:
def __init__(self, module, package):
self.module = module
init_overhead = timedelta(minutes = 3)
self.engine = Engine(
package = package,
max_workers = 3,
budget = 10,
timeout = init_overhead + timedelta(minutes = 15),
subnet_tag = "devnet-alpha.2",
event_emitter = log_summary(),
)
async def submit(self, task):
async def worker(ctx: WorkContext, tasks):
ctx.send_file(self.module, "/golem/input/func")
async for task in tasks:
data = task.data
ctx.send_file(data["input"], "/golem/input/in")
ctx.run("python", "/golem/runner.py", "/golem/input/func", "/golem/input/in")
ctx.download_file("/golem/output/out", data["output"])
yield ctx.commit()
task.accept_task(result=data["output"])
e = await self.engine.__aenter__()
it = e.map(worker, [Task(data=task)])
return await it.__anext__()
def chunks(xs, size):
for i in range(0, len(xs), size):
yield xs[i:i + size]
def sumit(xs):
return sum(xs)
async def main():
module = "func"
with open(module, "wb") as f:
marshal.dump(sumit.__code__, f)
package = await vm.repo(
image_hash="175984b21694138aa5db74c7ef5a29b5261cfe5803c02ef50309c183",
min_mem_gib=0.5,
min_storage_gib=2.0,
)
engine = EngineWrapper(module, package)
futures = []
for (i,chunk) in enumerate(chunks(list(range(100)), 10)):
input_ = f"in{i}"
output = f"out{i}"
with open(input_, "wt") as f:
json.dump(chunk, f)
futures += [engine.submit({"input": input_, "output": output})]
total_sum = 0
for res in asyncio.as_completed(futures):
res = await res
with open(res.data["output"], "rt") as f:
total_sum += json.load(f)
print(f"{total_sum}")
if __name__ == "__main__":
enable_default_logger()
loop = asyncio.get_event_loop()
task = loop.create_task(main())
try:
asyncio.get_event_loop().run_until_complete(task)
except (Exception, KeyboardInterrupt) as e:
print(e)
task.cancel()
asyncio.get_event_loop().run_until_complete(task)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment