Skip to content

Instantly share code, notes, and snippets.

@Jaa-c
Last active January 12, 2020 20:55
Show Gist options
  • Save Jaa-c/23da51092e861ee0c003f3bb975095ce to your computer and use it in GitHub Desktop.
Save Jaa-c/23da51092e861ee0c003f3bb975095ce to your computer and use it in GitHub Desktop.
Event based async jobs with dependencies
import asyncio
from enum import Enum
import queue
import time
import json
from random import seed
from random import random
traced_events = []
script_start = time.time()
seed(time.time())
class Context:
def __init__(self):
self.repository = None
self.commits = None
class Status(Enum):
WAITING = 0,
PROGRESS = 1,
DONE = 2,
class Job:
def __init__(self, name, function):
self.name = name
self.function = function
self.status = Status.WAITING
self.result = False
self.dependencies = []
def depends(self, *jobs):
for job in jobs:
self.dependencies.append(job)
def can_run(self):
for dependency in self.dependencies:
if dependency.status != Status.DONE:
return False
return True
def trace(func):
async def inner(self):
start = time.time()
result = await func(self)
task_start = (start - script_start) * 10e5
duration = (time.time() - start) * 10e5
trace = {"pid": 1, "tid": len(traced_events) % 4, "ts": task_start, "dur": duration, "ph": "X", "name": self.name, "args": {"result": self.result}}
traced_events.append(trace)
return result
return inner
@trace
async def run(self):
print(f"start {self.name}")
self.status = Status.PROGRESS
await self.function
self.status = Status.DONE
print(f"end {self.name}")
async def loop(tasks: queue.Queue):
while not tasks.empty():
task = tasks.get()
if task.can_run():
asyncio.ensure_future(task.run())
else:
tasks.put(task)
await asyncio.sleep(0.1)
async def work(contxt: Context, sec):
rand = random() + 0.5
await asyncio.sleep(sec * rand)
async def work_blocking(contxt: Context, sec):
rand = random() + 0.5
time.sleep(sec * rand)
if __name__ == "__main__":
tasks = []
c = Context()
def make_job(name, function):
job = Job(name, function)
tasks.append(job)
return job
branch = make_job("create branch", work(c, 2))
build = make_job("build", work(c, 2))
build.depends(branch)
merge_build = make_job("merge_build", work(c, 20))
merge_build.depends(build)
unit_tests = make_job("unit_tests", work(c, 10))
unit_tests.depends(build)
build_all = make_job("build_all", work(c, 30))
build_all.depends(build)
at = make_job("AT", work(c, 30))
at.depends(build)
regex = make_job("regex check", work_blocking(c, 4))
regex.depends(build)
at_bisect = make_job("AT bisect", work(c, 6))
at_bisect.depends(at, build_all)
end = Job("end", work(c, 0))
end.depends(*tasks)
tasks.append(end)
queue = queue.Queue()
for task in tasks:
queue.put(task)
event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(event_loop)
result = event_loop.run_until_complete(loop(queue))
with open('trace.json', 'w') as file:
file.write('{"traceEvents":\n')
json.dump(traced_events, file, indent=2)
file.write("\n}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment