Skip to content

Instantly share code, notes, and snippets.

@henryiii
Last active March 3, 2022 05:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save henryiii/ddeda112ceeb3f61b0e7e332a2b34662 to your computer and use it in GitHub Desktop.
Save henryiii/ddeda112ceeb3f61b0e7e332a2b34662 to your computer and use it in GitHub Desktop.
Example of a package builder (pyodide)
from graphlib import TopologicalSorter
import asyncio
import dataclasses
import yaml
from pathlib import Path
import functools
from typing import NoReturn
@functools.total_ordering
@dataclasses.dataclass(kw_only=True, frozen=True)
class Package:
name: str
deps: tuple[str, ...] = ()
def __lt__(self, other) -> bool:
return len(self.deps) > len(other.deps)
def __eq__(self, other) -> bool:
return self.name == other.name
async def build(self) -> None:
print(f"Starting {self.name} ->", *self.deps)
proc = await asyncio.create_subprocess_shell(f"sleep {len(self.deps)}")
await proc.wait()
print(f"Finished {self.name}")
WorkQueue = asyncio.PriorityQueue[Package]
FinishedQueue = asyncio.Queue[Package]
Graph = dict[Package, set[Package]]
async def worker(tasks: WorkQueue, results: FinishedQueue) -> NoReturn:
while True:
package = await tasks.get()
await package.build()
results.put_nowait(package)
async def add_nodes(
graph: Graph, tasks: WorkQueue, finished_tasks: FinishedQueue
) -> None:
topological_sorter = TopologicalSorter(graph)
topological_sorter.prepare()
while topological_sorter.is_active():
for package in topological_sorter.get_ready():
tasks.put_nowait(package)
package = await finished_tasks.get()
topological_sorter.done(package)
async def build_packages() -> None:
metas_paths = list(Path("packages").glob("*/meta.yaml"))
packages = {"distutils": Package(name="distutils", deps=())}
for p in metas_paths:
with p.open() as f:
full = yaml.safe_load(f)
name = full["package"]["name"]
deps = tuple(full.get("requirements", {}).get("run", ()))
packages[name] = Package(name=name, deps=deps)
graph = {p: {packages[s] for s in p.deps} for p in packages.values()}
tasks = WorkQueue()
finished_tasks = FinishedQueue()
workers = [asyncio.create_task(worker(tasks, finished_tasks)) for _ in range(4)]
producer = asyncio.create_task(add_nodes(graph, tasks, finished_tasks))
await asyncio.wait({producer, *workers}, return_when=asyncio.FIRST_COMPLETED)
if __name__ == "__main__":
asyncio.run(build_packages())
from graphlib import TopologicalSorter
import asyncio
from typing import NoReturn
WorkQueue = asyncio.Queue[str]
FinishedQueue = asyncio.Queue[str]
Graph = dict[str, set[str]]
GRAPH: Graph = {
"numpy": set(),
"matplotlib": {"numpy"},
"pandas": {"numpy", "matplotlib"},
"requests": set(),
"uncertainties": {"numpy"},
"scipy": {"numpy"},
}
async def build(package: str) -> None:
deps = GRAPH[package]
print(f"Starting {package} for {len(deps)+1} s ->", *deps)
proc = await asyncio.create_subprocess_shell(f"sleep {len(deps)+1}")
await proc.wait()
print(f"Finished {package}")
async def worker(tasks: WorkQueue, results: FinishedQueue) -> NoReturn:
while True:
package = await tasks.get()
await build(package)
results.put_nowait(package)
async def add_nodes(tasks: WorkQueue, finished_tasks: FinishedQueue) -> None:
topological_sorter = TopologicalSorter(GRAPH)
topological_sorter.prepare()
while topological_sorter.is_active():
for package in topological_sorter.get_ready():
tasks.put_nowait(package)
package = await finished_tasks.get()
topological_sorter.done(package)
async def main() -> None:
tasks = WorkQueue()
finished_tasks = FinishedQueue()
workers = [asyncio.create_task(worker(tasks, finished_tasks)) for _ in range(4)]
producer = asyncio.create_task(add_nodes(tasks, finished_tasks))
await asyncio.wait({producer, *workers}, return_when=asyncio.FIRST_COMPLETED)
if __name__ == "__main__":
asyncio.run(main())
from graphlib import TopologicalSorter
import asyncio
import dataclasses
from typing import NoReturn
@dataclasses.dataclass(kw_only=True, order=True, frozen=True)
class Package:
priority: int
name: str
async def build(self) -> None:
print(f"Starting {self.name} for {self.priority} s")
proc = await asyncio.create_subprocess_shell(f"sleep {self.priority}")
await proc.wait()
print(f"Finished {self.name}")
WorkQueue = asyncio.PriorityQueue[Package]
FinishedQueue = asyncio.Queue[Package]
Graph = dict[Package, set[Package]]
NUMPY = Package(name="numpy", priority=5)
MATPLOTLIB = Package(name="matplotlib", priority=3)
PANDAS = Package(name="pandas", priority=4)
REQUESTS = Package(name="requests", priority=1)
UNCERTAINTIES = Package(name="uncertainties", priority=1)
SCIPY = Package(name="scipy", priority=7)
GRAPH: Graph = {
NUMPY: set(),
MATPLOTLIB: {NUMPY},
PANDAS: {NUMPY, MATPLOTLIB},
REQUESTS: set(),
UNCERTAINTIES: {NUMPY},
SCIPY: {NUMPY},
}
async def worker(tasks: WorkQueue, results: FinishedQueue) -> NoReturn:
while True:
package = await tasks.get()
await package.build()
results.put_nowait(package)
async def add_nodes(tasks: WorkQueue, finished_tasks: FinishedQueue) -> None:
topological_sorter = TopologicalSorter(GRAPH)
topological_sorter.prepare()
while topological_sorter.is_active():
for package in topological_sorter.get_ready():
tasks.put_nowait(package)
package = await finished_tasks.get()
topological_sorter.done(package)
async def main() -> None:
tasks = WorkQueue()
finished_tasks = FinishedQueue()
workers = [asyncio.create_task(worker(tasks, finished_tasks)) for _ in range(4)]
producer = asyncio.create_task(add_nodes(tasks, finished_tasks))
await asyncio.wait({producer, *workers}, return_when=asyncio.FIRST_COMPLETED)
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment