Skip to content

Instantly share code, notes, and snippets.

@odedlaz
Created April 11, 2017 16:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save odedlaz/fc65e0903dc23dff7ea0096773b6e8dc to your computer and use it in GitHub Desktop.
Save odedlaz/fc65e0903dc23dff7ea0096773b6e8dc to your computer and use it in GitHub Desktop.
Full gist of my blog post about piping with coroutines
from __future__ import print_function
from functools import wraps
from io import SEEK_END
from re import compile
from sys import argv
from time import sleep
RE_TYPE = type(compile(""))
def coroutine(f):
@wraps(f)
def wrapper(*args, **kwargs):
cr = f(*args, **kwargs)
cr.next()
return cr
return wrapper
@coroutine
def broadcast(*targets):
while True:
text = (yield)
for target in targets:
target.send(text)
@coroutine
def filter(expression, target):
if isinstance(expression, basestring):
expression = compile(expression)
if not isinstance(expression, RE_TYPE):
raise ValueError("expression should be a string or re type")
while True:
# this is called on every .send() call
text = (yield)
if expression.search(text):
target.send(text)
@coroutine
def cleaner(pattern, target):
expression = compile(pattern)
def clean(match):
text = match.group(0)
return text[0] + ('*' * (len(text) - 2)) + text[-1]
while True:
text = expression.sub(clean, (yield))
target.send(text)
@coroutine
def printer():
while True:
print((yield))
def follow(fd, target):
fd.seek(0, SEEK_END)
while True:
line = fd.readline().strip()
if not line:
sleep(0.1)
continue
target.send(line)
if __name__ == "__main__":
nasty_regex = compile("fuck")
filters = [filter("^hello", printer()),
filter(nasty_regex, cleaner(nasty_regex, printer()))]
with open(argv[1], 'r') as f:
follow(f, broadcast(*filters))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment