Create a gist now

Instantly share code, notes, and snippets.

@jedbrown /
Last active Dec 10, 2015

What would you like to do?
Illustrates constructing a pipeline with some components written in Python and some using external programs.
#!/usr/bin/env python
from subprocess import Popen, PIPE
import sys, threading
# Each stage that we implement in Python. If processing binary data, we would
# read incrementally using, for example. The output file must
# be closed to flush the pipeline and allow it to clean up properly.
def writer(output):
for line in open('/usr/share/dict/words'):
def filter(input, output):
for line in input:
if 'k' in line and 'z' in line: # Selective 'tee'
sys.stderr.write('### ' + line)
def leeter(input, output):
for line in input:
output.write(line.replace('E', '3'))
# Our pipeline will contain some pieces written in Python that must run in
# threads to avoid blocking IO. These could instead run in subprocesses if you
# want them to actually run in parallel (avoid the GIL).
def spawn(func, **kwargs):
t = threading.Thread(target=func, kwargs=kwargs)
return t
# External programs that will participate in the pipeline
grepv = Popen(['grep','-v','not'], stdin=PIPE, stdout=PIPE, bufsize=-1)
cut = Popen(['cut','-c','1-10'], stdin=PIPE, stdout=PIPE, bufsize=-1)
grepk = Popen(['grep', 'K'], stdin=PIPE, stdout=PIPE, bufsize=-1)
grepz = Popen(['grep', 'Z'], stdin=grepk.stdout, stdout=PIPE, bufsize=-1)
# Stages of the pipeline implemented in Python
twriter = spawn(writer, output=grepv.stdin)
tfilter = spawn(filter, input=grepv.stdout, output=cut.stdin)
tleeter = spawn(leeter, input=cut.stdout, output=grepk.stdin)
# Drive the pipeline by reading lines from the last output
for line in grepz.stdout:
# Explicitly clean up
for t in [twriter, tfilter, tleeter]: t.join()
for p in [grepv, cut, grepk, grepz]: p.wait()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment