Skip to content

Instantly share code, notes, and snippets.

@jedbrown
Last active December 10, 2015 05:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jedbrown/4391685 to your computer and use it in GitHub Desktop.
Save jedbrown/4391685 to your computer and use it in GitHub Desktop.
Illustrates constructing a pipeline with some components written in Python and some using external programs.
#!/usr/bin/env python
from subprocess import Popen, PIPE
import sys, threading
# Each stage that we implement in Python. If processing binary data, we would
# read incrementally using input.read(4096), for example. The output file must
# be closed to flush the pipeline and allow it to clean up properly.
def writer(output):
for line in open('/usr/share/dict/words'):
output.write(line)
output.close()
def filter(input, output):
for line in input:
if 'k' in line and 'z' in line: # Selective 'tee'
sys.stderr.write('### ' + line)
output.write(line.upper())
output.close()
def leeter(input, output):
for line in input:
output.write(line.replace('E', '3'))
output.close()
# Our pipeline will contain some pieces written in Python that must run in
# threads to avoid blocking IO. These could instead run in subprocesses if you
# want them to actually run in parallel (avoid the GIL).
def spawn(func, **kwargs):
t = threading.Thread(target=func, kwargs=kwargs)
t.start()
return t
# External programs that will participate in the pipeline
grepv = Popen(['grep','-v','not'], stdin=PIPE, stdout=PIPE, bufsize=-1)
cut = Popen(['cut','-c','1-10'], stdin=PIPE, stdout=PIPE, bufsize=-1)
grepk = Popen(['grep', 'K'], stdin=PIPE, stdout=PIPE, bufsize=-1)
grepz = Popen(['grep', 'Z'], stdin=grepk.stdout, stdout=PIPE, bufsize=-1)
# Stages of the pipeline implemented in Python
twriter = spawn(writer, output=grepv.stdin)
tfilter = spawn(filter, input=grepv.stdout, output=cut.stdin)
tleeter = spawn(leeter, input=cut.stdout, output=grepk.stdin)
# Drive the pipeline by reading lines from the last output
for line in grepz.stdout:
sys.stdout.write(line.lower())
# Explicitly clean up
for t in [twriter, tfilter, tleeter]: t.join()
for p in [grepv, cut, grepk, grepz]: p.wait()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment