Skip to content

Instantly share code, notes, and snippets.

@taskie
Created February 18, 2018 09:04
Show Gist options
  • Save taskie/c14829eabc2be4ef07ef2fa652c6abcd to your computer and use it in GitHub Desktop.
Save taskie/c14829eabc2be4ef07ef2fa652c6abcd to your computer and use it in GitHub Desktop.
import io
import subprocess
import functools
import copy
import os
import sys
from collections import ChainMap
__version__ = '0.0.1'
_debug = False
def get_last_run_result(run_result):
last_run_result = run_result
while isinstance(last_run_result.children, list):
if len(last_run_result.children):
last_run_result = last_run_result.children[-1]
else:
break
return last_run_result
class RunResult(object):
def __init__(self, proc, popen, children=None):
self.proc = proc
self.popen = popen
self.children = [] if children is None else children
self.completed = False
self.stdout_data = None
self.stderr_data = None
def __repr__(self):
return "RunResult(proc={}, popen={}, len(children)={}, completed={})".format(self.proc, self.popen, len(self.children), self.completed)
class Runner(object):
def __init__(self, ctx):
self.ctx = ctx
def run(self, proc):
return getattr(self, "_run_" + proc.__class__.__name__)(proc)
def _run_Identity(self, identity):
child = self.ctx.run(identity.proc)
return RunResult(identity, None, [child])
def _run_System(self, system):
popen = subprocess.Popen(system.args, env=self.ctx.env, **self.ctx.popen_keywords)
return RunResult(system, popen)
def _run_Serial(self, serial):
children = []
for i, proc in enumerate(serial.procs):
ctx = self.ctx
if i != len(serial.procs) - 1:
ctx = ctx.child().export(Context(should_wait=True))
sub_run_result = ctx.run(proc)
children.append(sub_run_result)
return RunResult(serial, None, children)
def _run_Pipeline(self, pipeline):
children = []
if not pipeline.procs:
return RunResult(pipeline, None, children)
next_stdin = None
for i, proc in enumerate(pipeline.procs):
ctx = self.ctx
if next_stdin is not None:
ctx = ctx.child().export(Context(stdin=next_stdin))
if i != len(pipeline.procs) - 1:
ctx = ctx.child().export(Context(should_wait=False, stdout=subprocess.PIPE))
sub_run_result = ctx.run(proc)
children.append(sub_run_result)
next_stdin = get_last_run_result(sub_run_result).popen.stdout
return RunResult(pipeline, None, children)
class Waiter(object):
def __init__(self, ctx):
self.ctx = ctx
def wait(self, run_result):
return getattr(self, "_wait_" + run_result.proc.__class__.__name__)(run_result)
def _wait_Identity(self, run_result):
child = run_result.children[0]
child_result = self.ctx.wait_if_needed(child)
run_result.completed = child_result.completed
run_result.stdout_data = child_result.stdout_data
run_result.stderr_data = child_result.stderr_data
return run_result
def _wait_System(self, run_result):
input = self.ctx.input
if isinstance(input, OnceInput):
input = input.read()
stdout_data, stderr_data = run_result.popen.communicate(input=input)
run_result.completed = True
run_result.stdout_data = stdout_data
run_result.stderr_data = stderr_data
return run_result
def _wait_Serial(self, run_result):
child_results = [self.ctx.wait_if_needed(child) for child in run_result.children]
run_result.completed = child_results[-1].completed
if run_result.stdout_data is None:
run_result.stdout_data = b''
if run_result.stderr_data is None:
run_result.stderr_data = b''
for child_result in child_results:
if child_result.stdout_data is not None:
run_result.stdout_data += child_result.stdout_data
if child_result.stderr_data is not None:
run_result.stderr_data += child_result.stderr_data
return run_result
def _wait_Pipeline(self, run_result):
child_results = [self.ctx.wait_if_needed(child) for child in run_result.children]
child_result = child_results[-1]
run_result.completed = child_result.completed
if run_result.stdout_data is None:
run_result.stdout_data = b''
if run_result.stderr_data is None:
run_result.stderr_data = b''
for child_result in child_results:
if child_result.stdout_data is not None:
run_result.stdout_data += child_result.stdout_data
if child_result.stderr_data is not None:
run_result.stderr_data += child_result.stderr_data
return run_result
class Context(object):
def __init__(self, parent=None, depth=None, resources_by_depth=None, should_wait=None, input=None, env=None, **kwargs):
self.parent = parent
self.depth = 0 if depth is None else depth
self.resources_by_depth = {} if resources_by_depth is None else resources_by_depth
self.should_wait = should_wait
self.input = input
if env is None:
self.env = ChainMap({})
else:
if isinstance(env, ChainMap):
self.env = env.new_child()
else:
self.env = ChainMap({}, env)
self.popen_keywords = ChainMap({}, kwargs)
self.runner = Runner(self)
self.waiter = Waiter(self)
def child(self):
return Context(parent=self,
depth=self.depth + 1,
resources_by_depth=self.resources_by_depth,
should_wait=self.should_wait,
input=self.input,
env=self.env,
**self.popen_keywords)
def export(self, ctx):
if ctx.should_wait is not None:
self.should_wait = ctx.should_wait
for k, v in ctx.env.items():
self.env[k] = v
for k, v in ctx.popen_keywords.items():
self.popen_keywords[k] = v
if ctx.input is not None:
self.input = ctx.input
return self
def _debug_log(self, name, *args, **kwargs):
if _debug:
print("{: 4d} {: <10s}".format(self.depth, name), *args, file=sys.stderr, **kwargs)
def run(self, proc):
ctx = self.child().export(proc.ctx)
for k in ["stdin", "stdout", "stderr"]:
f = ctx.popen_keywords.get(k)
if isinstance(f, Opener):
if not f.opened:
f.open()
ctx.add_resource(f)
ctx.popen_keywords[k] = f.file
# FIXME: input is buggy
if ctx.input is not None:
ctx.popen_keywords["stdin"] = subprocess.PIPE
self._debug_log("RUN", ctx.should_wait, proc)
run_result = ctx.runner.run(proc)
return ctx.wait_if_needed(run_result)
def wait(self, run_result):
wait_result = self.waiter.wait(run_result)
self._debug_log("WAIT", wait_result)
for depth, resources in sorted(self.resources_by_depth.items(), key=lambda t: -t[0]):
if depth < self.depth:
break
for resource in reversed(resources):
self._debug_log("CLOSE_RES", depth, resource)
resource.close()
del self.resources_by_depth[depth]
return wait_result
def wait_if_needed(self, run_result):
if self.should_wait and not run_result.completed:
return self.wait(run_result)
else:
return run_result
def waiting_ancestor(self):
ctx = self
while True:
if ctx is None:
break
if ctx.should_wait:
break
ctx = ctx.parent
return ctx
def add_resource(self, resource):
self._debug_log("ADD_RES", resource)
resources = self.resources_by_depth.get(self.depth)
if resources is None:
resources = []
self.resources_by_depth[self.depth] = resources
resources.append(resource)
def __str__(self):
return "{}({})".format(self.__class__.__name__, self.__dict__)
class Proc(object):
def __init__(self, **kwargs):
self.ctx = Context(**kwargs)
def run(self, ctx):
ctx = ctx.child().export(Context(should_wait=True))
return ctx.run(self)
def __or__(self, other):
return Pipeline([self, other])
def __lt__(self, other):
if isinstance(other, str):
other = Opener(other, "r")
return Identity(self, stdin=other)
def __gt__(self, other):
if isinstance(other, str):
other = Opener(other, "w")
return Identity(self, stdout=other)
def __rshift__(self, other):
if isinstance(other, str):
other = Opener(other, "a")
return Identity(self, stdout=other)
def __lshift__(self, other):
if not isinstance(other, OnceInput):
other = OnceInput(other)
return Identity(self, input=other)
def __getitem__(self, attr):
if isinstance(attr, Redirector):
kwargs = {}
if attr.fileno == 0:
kwargs["stdin"] = attr.file
elif attr.fileno == 1:
kwargs["stdout"] = attr.file
elif attr.fileno == 2:
kwargs["stderr"] = attr.file
else:
raise Exception("Redirect: invalid fileno: " + str(attr.fileno))
return Identity(self, **kwargs)
elif isinstance(attr, OnceInput):
return Identity(self, input=attr)
else:
raise Exception("invalid attr: " + str(attr))
class OnceInput(object):
def __init__(self, value):
self.value = value
def read(self):
value = self.value
self.value = None
if value is None:
return None
elif isinstance(value, str):
return bytes(value, encoding=sys.getdefaultencoding())
elif isinstance(value, bytes):
return value
else:
return value.read()
class Opener(object):
def __init__(self, *args, **kwargs):
self.args = args
self.keywords = kwargs
self.file = None
self.opened = False
def open(self):
if not self.opened:
self.file = open(*self.args, **self.keywords)
self.opened = True
return self
def close(self):
if self.opened:
self.file.close()
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_value, traceback):
self.close()
return True
def __repr__(self):
return "Opener(args={}, keywords={})".format(self.args, self.keywords)
class Identity(Proc):
def __init__(self, proc, **kwargs):
super().__init__(**kwargs)
self.proc = proc
class System(Proc):
def __init__(self, args, **kwargs):
super().__init__(**kwargs)
self.args = args
def __repr__(self):
return "System(args={})".format(self.args)
class Serial(Proc):
def __init__(self, procs, **kwargs):
super().__init__(**kwargs)
self.procs = procs
class Pipeline(Proc):
def __init__(self, procs, **kwargs):
super().__init__(**kwargs)
self.procs = procs
class FD(object):
def __init__(self, fileno=None):
self.fileno = fileno
def __lt__(self, other):
fileno = 0 if self.fileno is None else self.fileno
other = Opener(other, "r") if isinstance(other, str) else other
return Redirector(fileno, other)
def __gt__(self, other):
fileno = 1 if self.fileno is None else self.fileno
other = Opener(other, "w") if isinstance(other, str) else other
return Redirector(fileno, other)
def __rshift__(self, other):
fileno = 1 if self.fileno is None else self.fileno
other = Opener(other, "a") if isinstance(other, str) else other
return Redirector(fileno, other)
class Redirector(object):
def __init__(self, fileno, file):
self.fileno = fileno
self.file = file
def run(*args, **kwargs):
return Serial(args).run(Context(**kwargs))
def out(*args, **kwargs):
return str(run(*args, stdout=subprocess.PIPE).stdout_data, encoding=sys.getdefaultencoding())
def serial(*args, **kwargs):
return Serial(args, **kwargs)
def system(*args, **kwargs):
return System(args, **kwargs)
def pipeline(*args, **kwargs):
return Pipeline(args, **kwargs)
def _print_run_result(run_result, depth=0):
print(" " * depth, run_result.proc.__class__.__name__, run_result.completed, repr(run_result.stdout_data), repr(run_result.stderr_data))
for child in run_result.children:
_print_run_result(child, depth + 1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment