Skip to content

Instantly share code, notes, and snippets.

@NF1198
Created April 2, 2020 04:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save NF1198/e355f97ad7fc42dd69c96b05e035f09a to your computer and use it in GitHub Desktop.
Save NF1198/e355f97ad7fc42dd69c96b05e035f09a to your computer and use it in GitHub Desktop.
#
# process_handler.py
# Copyright 2020 Nicholas Folse <nickfolse@gmail.com>
#
# This module defines a subprocess handler capable of managing
# multiple simultaneous subprocess.Popen calls. Results of each subprocess
# are processed in parallel. Each subprocess is injected with a line handler
# which is responsible for handling input from each call. Line handlers
# are called as line_handler(proc, line). If multiple handlers are injected for
# a given subprocess, the output of a handler is passed as the input to the next
# handler in the chain (with proc as the first argument). Upon termination of the
# stream, the handler chain will be called with None as a line argument.
#
# Refer to the included example for a possibile application.
#
# This module utlizes the select.epoll mechanism which is only supported on Linux/Unix
#
# Running this module as __main__ should yield something like the following:
# ('uname', u'Linux x86_64 x86_64 x86_64 GNU/Linux\n')
# ('ls /', "[u'bin', u'boot', u'dev', u'etc', u'home', ..., u'tmp', u'usr', u'var']")
# [0, 0]
#
from __future__ import print_function
import subprocess
import select
def chain_handlers(handler, *handlers):
if len(handlers) == 0:
return handler
else:
def _(proc, line):
result = handler(proc, line)
for h in handlers:
result = h(proc, result)
return result
return _
class ProcessHandler:
def __init__(self):
self._epoll = None
self._handled_processes = None
def __enter__(self):
self._epoll = select.epoll()
self._handled_processes = {}
return self
def __exit__(self, exc_type, exc_value, traceback):
self._epoll.close()
self._epoll = None
def inject(self, proc, handler, *handlers):
proc_id = proc.stdout.fileno()
handler_chain = chain_handlers(handler, *handlers)
self._handled_processes[proc_id] = (proc, handler_chain)
self._epoll.register(proc_id, select.EPOLLIN | select.EPOLLHUP)
def poll(self, timeout=1):
finished_procs = []
if len(self._handled_processes) == 0:
return []
for fd, evt in self._epoll.poll(timeout):
proc, handler = self._handled_processes[fd]
if evt & select.EPOLLIN:
handler(proc, proc.stdout.readline())
if evt & select.EPOLLHUP:
for line in proc.stdout:
handler(proc, line)
handler(proc, None)
finished_procs.append(proc)
self._epoll.unregister(fd)
del self._handled_processes[fd]
return finished_procs
@property
def done(self):
return len(self._handled_processes) == 0
if __name__ == "__main__":
uname_proc = lambda: subprocess.Popen(["uname", "-a"], stdout=subprocess.PIPE)
ls_proc = lambda dir: subprocess.Popen(["ls", dir], stdout=subprocess.PIPE)
def printer(tag):
def _(proc, arg):
if arg:
print((tag, arg))
return arg
return _
def uname_handler():
uname = []
def _(proc, line):
if line:
uname.append(line.decode())
return None
else:
return uname[0]
return _
def ls_handler():
items = []
def _(proc, line):
if line:
items.append(line.decode().strip())
return None
else:
return str(items)
return _
def linux_tester(process_handler):
def _(proc, uname):
if uname and uname.startswith("Linux"):
process_handler.inject(ls_proc("/"), ls_handler(), printer("ls /"))
return uname
return _
with ProcessHandler() as h:
h.inject(uname_proc(), uname_handler(), linux_tester(h), printer("uname"))
done = []
while not h.done:
for finished_proc in h.poll():
done.append(finished_proc)
print([p.poll() for p in done])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment