#!/usr/bin/env python | |
from subprocess import Popen, PIPE | |
import sys, threading | |
# Each stage that we implement in Python. If processing binary data, we would | |
# read incrementally using input.read(4096), for example. The output file must | |
# be closed to flush the pipeline and allow it to clean up properly. | |
def writer(output): | |
for line in open('/usr/share/dict/words'): | |
output.write(line) | |
output.close() | |
def filter(input, output): | |
for line in input: | |
if 'k' in line and 'z' in line: # Selective 'tee' | |
sys.stderr.write('### ' + line) | |
output.write(line.upper()) | |
output.close() | |
def leeter(input, output): | |
for line in input: | |
output.write(line.replace('E', '3')) | |
output.close() | |
# Our pipeline will contain some pieces written in Python that must run in | |
# threads to avoid blocking IO. These could instead run in subprocesses if you | |
# want them to actually run in parallel (avoid the GIL). | |
def spawn(func, **kwargs): | |
t = threading.Thread(target=func, kwargs=kwargs) | |
t.start() | |
return t | |
# External programs that will participate in the pipeline | |
grepv = Popen(['grep','-v','not'], stdin=PIPE, stdout=PIPE, bufsize=-1) | |
cut = Popen(['cut','-c','1-10'], stdin=PIPE, stdout=PIPE, bufsize=-1) | |
grepk = Popen(['grep', 'K'], stdin=PIPE, stdout=PIPE, bufsize=-1) | |
grepz = Popen(['grep', 'Z'], stdin=grepk.stdout, stdout=PIPE, bufsize=-1) | |
# Stages of the pipeline implemented in Python | |
twriter = spawn(writer, output=grepv.stdin) | |
tfilter = spawn(filter, input=grepv.stdout, output=cut.stdin) | |
tleeter = spawn(leeter, input=cut.stdout, output=grepk.stdin) | |
# Drive the pipeline by reading lines from the last output | |
for line in grepz.stdout: | |
sys.stdout.write(line.lower()) | |
# Explicitly clean up | |
for t in [twriter, tfilter, tleeter]: t.join() | |
for p in [grepv, cut, grepk, grepz]: p.wait() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment