Skip to content

Instantly share code, notes, and snippets.

@L3viathan
Created June 11, 2024 11:25
Show Gist options
  • Save L3viathan/723695ecffe395bdbd44316cccbd3934 to your computer and use it in GitHub Desktop.
Save L3viathan/723695ecffe395bdbd44316cccbd3934 to your computer and use it in GitHub Desktop.
Column-based parallelization

Requires the dont library

import textwrap
import concurrent.futures
import dont
class parallel(dont):
def __init__(self, sep="|", njobs=None):
self.sep = sep
self.njobs = njobs
def get_cols(self):
cols = []
for lines in (line.split(self.sep) for line in self.content):
for i, line in enumerate(lines):
if len(cols) <= i:
cols.append([])
cols[i].append(line)
return [
textwrap.dedent("\n".join(col))
for col in cols
]
def make_fn(self, col):
exec_ns = {}
exec(
f"""def fn(namespace):
locals().update(namespace)
{textwrap.indent(col, " ")}
return locals()""",
self.frame.f_globals,
exec_ns,
)
return exec_ns["fn"]
def hook(self):
# self.content
cols = self.get_cols()
namespace = self.frame.f_locals
with concurrent.futures.ThreadPoolExecutor(
max_workers=self.njobs or len(cols),
) as executor:
futures = {executor.submit(self.make_fn(col), namespace) for col in cols}
for future in concurrent.futures.as_completed(futures):
try:
self.frame.f_locals.update(future.result())
except Exception:
... # TODO: handle
raise
from time import sleep
from parcol import parallel
with parallel():
print("hello") | sleep(.5)
sleep(1) | print("world")
x = "world" | sleep(.5)
sleep(.5) | print("bye")
print(x) | sleep(1/0)
... | print("kaputt")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment