Skip to content

Instantly share code, notes, and snippets.

@lmmx
Last active July 16, 2024 15:49
Show Gist options
  • Save lmmx/0adfda53855ffa5e0dba5df3ab2009af to your computer and use it in GitHub Desktop.
Save lmmx/0adfda53855ffa5e0dba5df3ab2009af to your computer and use it in GitHub Desktop.
Breaking a pipeline step out into 'tasks', and making trivial use of a TypeAdapter on a singleton list to use IO linting at runtime (in line with the progressive nature of a pipeline step)
from pathlib import Path
from pydantic import BaseModel, FilePath, NewPath, OnErrorOmit, TypeAdapter
class AvailableTask(BaseModel):
src: list[FilePath]
dst: list[NewPath]
class Task(BaseModel):
src: list[Path]
dst: list[Path]
class Step(BaseModel):
name: str
tasks: list[Task]
file_tasks = [
(["a.in"], ["a.out"]),
(["a.out"], ["b.out"]),
(["a.out", "b.out"], ["c.out"]),
(["d.in"], ["d.out"]),
(["e.in"], ["e.out"]),
]
avail_ta = TypeAdapter(list[OnErrorOmit[AvailableTask]])
def run_step():
tasks = [dict(src=s, dst=d) for s, d in file_tasks]
step = Step(name="Demo Step", tasks=tasks)
if step.tasks:
print(f"Running step {step.name!r} with {len(step.tasks)} tasks")
else:
raise ValueError("No tasks were assigned")
task_picked_up = False
bail = False
for idx, task in enumerate(step.tasks):
task_repr = " --> ".join(map(str, (task.model_dump(mode="json").values())))
print(
f"\n--- Task {idx + 1} --- Prepared task\n{'':15}{task_repr}\n",
end=f"{'':10}",
)
if bail:
print(f" (-) Bailing out of step, skipping task")
continue
available = avail_ta.validate_python([task.model_dump()])
if available:
print(f" \033[92;1m>>>\033[0m Running available task")
task_picked_up = True
else:
if task_picked_up:
print(f" (!) Task requisite missing, bailing")
bail = True
else:
print(f" (x) Task already completed, skipping")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment