Last active
July 16, 2024 15:49
-
-
Save lmmx/0adfda53855ffa5e0dba5df3ab2009af to your computer and use it in GitHub Desktop.
Breaking a pipeline step out into 'tasks', and making trivial use of a TypeAdapter on a singleton list to use IO linting at runtime (in line with the progressive nature of a pipeline step)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pathlib import Path | |
from pydantic import BaseModel, FilePath, NewPath, OnErrorOmit, TypeAdapter | |
class AvailableTask(BaseModel): | |
src: list[FilePath] | |
dst: list[NewPath] | |
class Task(BaseModel): | |
src: list[Path] | |
dst: list[Path] | |
class Step(BaseModel): | |
name: str | |
tasks: list[Task] | |
file_tasks = [ | |
(["a.in"], ["a.out"]), | |
(["a.out"], ["b.out"]), | |
(["a.out", "b.out"], ["c.out"]), | |
(["d.in"], ["d.out"]), | |
(["e.in"], ["e.out"]), | |
] | |
avail_ta = TypeAdapter(list[OnErrorOmit[AvailableTask]]) | |
def run_step(): | |
tasks = [dict(src=s, dst=d) for s, d in file_tasks] | |
step = Step(name="Demo Step", tasks=tasks) | |
if step.tasks: | |
print(f"Running step {step.name!r} with {len(step.tasks)} tasks") | |
else: | |
raise ValueError("No tasks were assigned") | |
task_picked_up = False | |
bail = False | |
for idx, task in enumerate(step.tasks): | |
task_repr = " --> ".join(map(str, (task.model_dump(mode="json").values()))) | |
print( | |
f"\n--- Task {idx + 1} --- Prepared task\n{'':15}{task_repr}\n", | |
end=f"{'':10}", | |
) | |
if bail: | |
print(f" (-) Bailing out of step, skipping task") | |
continue | |
available = avail_ta.validate_python([task.model_dump()]) | |
if available: | |
print(f" \033[92;1m>>>\033[0m Running available task") | |
task_picked_up = True | |
else: | |
if task_picked_up: | |
print(f" (!) Task requisite missing, bailing") | |
bail = True | |
else: | |
print(f" (x) Task already completed, skipping") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment