Skip to content

Instantly share code, notes, and snippets.

@matthewrobertbell
Created December 14, 2011 14:25
Show Gist options
  • Save matthewrobertbell/1476783 to your computer and use it in GitHub Desktop.
Save matthewrobertbell/1476783 to your computer and use it in GitHub Desktop.
from functools import partial
import time
def coroutine(func):
def start(*args,**kwargs):
cr = func(*args,**kwargs)
cr.next()
return cr
return start
def follow(target, f):
f.seek(0,2) # Go to the end of the file
while True:
line = f.readline()
if not line:
time.sleep(0.1) # Sleep briefly
continue
target.send(line)
@coroutine
def grep(target, pattern):
while True:
line = (yield) # Receive a line
if pattern in line:
target.send(line) # Send to next stage
@coroutine
def printer():
while True:
line = (yield)
print line,
def pipe(*args):
def run(input_func, func=None):
print 'run', input_func, func
if isinstance(input_func, tuple):
data = list(input_func)
input_func = data.pop(0)
return input_func(func, *data)
else:
if func:
return input_func(func)
else:
return input_func()
args = list(args)
source = args.pop(0)
sink = args.pop(len(args)-1)
func = run(sink)
for arg in args[::-1]:
func = run(arg, func)
return run(source, func)
pipe((follow,open('example.txt')),(grep,'pattern'),printer)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment