Last active
December 10, 2015 05:58
-
-
Save jedbrown/4391685 to your computer and use it in GitHub Desktop.
Illustrates constructing a pipeline with some components written in Python and some using external programs.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from subprocess import Popen, PIPE | |
import sys, threading | |
# Each stage that we implement in Python. If processing binary data, we would | |
# read incrementally using input.read(4096), for example. The output file must | |
# be closed to flush the pipeline and allow it to clean up properly. | |
def writer(output): | |
for line in open('/usr/share/dict/words'): | |
output.write(line) | |
output.close() | |
def filter(input, output): | |
for line in input: | |
if 'k' in line and 'z' in line: # Selective 'tee' | |
sys.stderr.write('### ' + line) | |
output.write(line.upper()) | |
output.close() | |
def leeter(input, output): | |
for line in input: | |
output.write(line.replace('E', '3')) | |
output.close() | |
# Our pipeline will contain some pieces written in Python that must run in | |
# threads to avoid blocking IO. These could instead run in subprocesses if you | |
# want them to actually run in parallel (avoid the GIL). | |
def spawn(func, **kwargs): | |
t = threading.Thread(target=func, kwargs=kwargs) | |
t.start() | |
return t | |
# External programs that will participate in the pipeline | |
grepv = Popen(['grep','-v','not'], stdin=PIPE, stdout=PIPE, bufsize=-1) | |
cut = Popen(['cut','-c','1-10'], stdin=PIPE, stdout=PIPE, bufsize=-1) | |
grepk = Popen(['grep', 'K'], stdin=PIPE, stdout=PIPE, bufsize=-1) | |
grepz = Popen(['grep', 'Z'], stdin=grepk.stdout, stdout=PIPE, bufsize=-1) | |
# Stages of the pipeline implemented in Python | |
twriter = spawn(writer, output=grepv.stdin) | |
tfilter = spawn(filter, input=grepv.stdout, output=cut.stdin) | |
tleeter = spawn(leeter, input=cut.stdout, output=grepk.stdin) | |
# Drive the pipeline by reading lines from the last output | |
for line in grepz.stdout: | |
sys.stdout.write(line.lower()) | |
# Explicitly clean up | |
for t in [twriter, tfilter, tleeter]: t.join() | |
for p in [grepv, cut, grepk, grepz]: p.wait() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment