public
Last active

Illustrates constructing a pipeline with some components written in Python and some using external programs.

  • Download Gist
pipey.py
Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
#!/usr/bin/env python
 
from subprocess import Popen, PIPE
import sys, threading
 
# Each stage that we implement in Python. If processing binary data, we would
# read incrementally using input.read(4096), for example. The output file must
# be closed to flush the pipeline and allow it to clean up properly.
def writer(output):
for line in open('/usr/share/dict/words'):
output.write(line)
output.close()
def filter(input, output):
for line in input:
if 'k' in line and 'z' in line: # Selective 'tee'
sys.stderr.write('### ' + line)
output.write(line.upper())
output.close()
def leeter(input, output):
for line in input:
output.write(line.replace('E', '3'))
output.close()
 
# Our pipeline will contain some pieces written in Python that must run in
# threads to avoid blocking IO. These could instead run in subprocesses if you
# want them to actually run in parallel (avoid the GIL).
def spawn(func, **kwargs):
t = threading.Thread(target=func, kwargs=kwargs)
t.start()
return t
 
# External programs that will participate in the pipeline
grepv = Popen(['grep','-v','not'], stdin=PIPE, stdout=PIPE, bufsize=-1)
cut = Popen(['cut','-c','1-10'], stdin=PIPE, stdout=PIPE, bufsize=-1)
grepk = Popen(['grep', 'K'], stdin=PIPE, stdout=PIPE, bufsize=-1)
grepz = Popen(['grep', 'Z'], stdin=grepk.stdout, stdout=PIPE, bufsize=-1)
# Stages of the pipeline implemented in Python
twriter = spawn(writer, output=grepv.stdin)
tfilter = spawn(filter, input=grepv.stdout, output=cut.stdin)
tleeter = spawn(leeter, input=cut.stdout, output=grepk.stdin)
 
# Drive the pipeline by reading lines from the last output
for line in grepz.stdout:
sys.stdout.write(line.lower())
 
# Explicitly clean up
for t in [twriter, tfilter, tleeter]: t.join()
for p in [grepv, cut, grepk, grepz]: p.wait()

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.