Created
April 20, 2012 12:34
-
-
Save lbolla/2428213 to your computer and use it in GitHub Desktop.
Python pipelines
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'''Pipeline | |
(yield) -> receiver | |
.send -> producer | |
''' | |
import time | |
N = 0 | |
def P(n): | |
'''Producer: only .send (and yield as entry point).''' | |
def _f(): | |
global N | |
N += 1 | |
time.sleep(1) | |
return N | |
yield # to "start" | |
while True: | |
n.send(_f()) | |
def S(n): | |
'''Stage: both (yield) and .send.''' | |
def _f(x): | |
print 'Stage', x | |
return x + 1 | |
while True: | |
r = (yield) | |
n.send(_f(r)) | |
def C(): | |
'''Consumer: only (yield).''' | |
def _f(x): | |
print 'Consumed', x | |
while True: | |
r = (yield) | |
_f(r) | |
def pipeline(*args): | |
'''Chain stages together. Assumes the last is the consumer.''' | |
c = args[-1]() | |
c.next() | |
t = c | |
for S in reversed(args[:-1]): | |
s = S(t) | |
s.next() | |
t = s | |
return t | |
if __name__ == '__main__': | |
p = pipeline(P, S, S, S, C) | |
p.next() # to "start" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'''Pipeline | |
(yield) -> receiver | |
.send -> producer | |
Provide initial state to producer, avoiding globals. | |
''' | |
import time | |
from functools import partial | |
def P(f, n): | |
'''Producer: only .send (and yield as entry point).''' | |
state = (yield) # get initial state | |
while True: | |
res, state = f(state) | |
n.send(res) | |
def S(f, n): | |
'''Stage: both (yield) and .send.''' | |
while True: | |
r = (yield) | |
n.send(f(r)) | |
def C(f): | |
'''Consumer: only (yield).''' | |
while True: | |
r = (yield) | |
f(r) | |
def pipeline(*args): | |
'''Chain stages together. Assumes the last is the consumer.''' | |
c = args[-1]() | |
c.next() | |
t = c | |
for S in reversed(args[:-1]): | |
s = S(t) | |
s.next() | |
t = s | |
return t | |
def produce(state): | |
'''Given a state, produce a result and the next state.''' | |
time.sleep(1) | |
return state, state + 1 | |
def stage(x): | |
print 'Stage', x | |
return x + 1 | |
def consume(x): | |
print 'Consumed', x | |
if __name__ == '__main__': | |
p = pipeline( | |
partial(P, produce), | |
partial(S, stage), | |
partial(S, stage), | |
partial(S, stage), | |
partial(C, consume), | |
) | |
initial_state = 0 | |
p.send(initial_state) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'''Pipeline | |
(yield) -> receiver | |
.send -> producer | |
Provide initial state to producer, avoiding globals. | |
Stop iteration after a bit. | |
Wrap in nice class. | |
''' | |
class StopPipeline(Exception): | |
pass | |
class Pipeline(object): | |
'''Chain stages together. Assumes the last is the consumer.''' | |
def __init__(self, *args): | |
c = Pipeline.C(args[-1]) | |
c.next() | |
t = c | |
for stg in reversed(args[1:-1]): | |
s = Pipeline.S(stg, t) | |
s.next() | |
t = s | |
p = Pipeline.P(args[0], t) | |
p.next() | |
self._pipeline = p | |
def start(self, initial_state): | |
try: | |
self._pipeline.send(initial_state) | |
except StopIteration: | |
self._pipeline.close() | |
@staticmethod | |
def P(f, n): | |
'''Producer: only .send (and yield as entry point).''' | |
state = (yield) # get initial state | |
while True: | |
try: | |
res, state = f(state) | |
except StopPipeline: | |
return | |
n.send(res) | |
@staticmethod | |
def S(f, n): | |
'''Stage: both (yield) and .send.''' | |
while True: | |
r = (yield) | |
n.send(f(r)) | |
@staticmethod | |
def C(f): | |
'''Consumer: only (yield).''' | |
while True: | |
r = (yield) | |
f(r) | |
def produce(state): | |
'''Given a state, produce a result and the next state.''' | |
import time | |
if state == 3: | |
raise StopPipeline('Enough!') | |
time.sleep(1) | |
return state, state + 1 | |
def stage(x): | |
print 'Stage', x | |
return x + 1 | |
def consume(x): | |
print 'Consumed', x | |
if __name__ == '__main__': | |
p = Pipeline( | |
produce, | |
stage, | |
stage, | |
stage, | |
consume, | |
) | |
initial_state = 0 | |
p.start(initial_state) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'''Pipeline | |
(yield) -> receiver | |
.send -> producer | |
Provide initial state to producer, avoiding globals. | |
Stop iteration after a bit. | |
Wrap in nice class. | |
Simple crawler to check if "python" was mentioned on HN. | |
''' | |
class StopPipeline(Exception): | |
pass | |
class Pipeline(object): | |
'''Chain stages together. Assumes the last is the consumer.''' | |
def __init__(self, *args): | |
c = Pipeline.C(args[-1]) | |
c.next() | |
t = c | |
for stg in reversed(args[1:-1]): | |
s = Pipeline.S(stg, t) | |
s.next() | |
t = s | |
p = Pipeline.P(args[0], t) | |
p.next() | |
self._pipeline = p | |
def start(self, initial_state): | |
try: | |
self._pipeline.send(initial_state) | |
except StopIteration: | |
self._pipeline.close() | |
@staticmethod | |
def P(f, n): | |
'''Producer: only .send (and yield as entry point).''' | |
state = (yield) # get initial state | |
while True: | |
try: | |
res, state = f(state) | |
except StopPipeline: | |
return | |
n.send(res) | |
@staticmethod | |
def S(f, n): | |
'''Stage: both (yield) and .send.''' | |
while True: | |
r = (yield) | |
n.send(f(r)) | |
@staticmethod | |
def C(f): | |
'''Consumer: only (yield).''' | |
while True: | |
r = (yield) | |
f(r) | |
def produce((urls, visited, domain)): | |
'''Given a state, produce a result and the next state.''' | |
import urllib2 | |
import re | |
nurls = len(urls) | |
if nurls == 0: | |
raise StopPipeline('No more urls') | |
else: | |
print 'Queue %d' % nurls | |
url = urls.pop() | |
doc = urllib2.urlopen(url).read() | |
links = re.compile('href="(http.+?)"').findall(doc) | |
urls.update([l for l in links if domain in l and l not in visited]) | |
visited.add(url) | |
return (url, doc), (urls, visited, domain) | |
def stage((url, doc)): | |
return (url, 'python' in doc.lower()) | |
def consume((url, haskell)): | |
if haskell: | |
print 'Python mentioned in %s' % url | |
if __name__ == '__main__': | |
p = Pipeline( | |
produce, | |
stage, | |
consume, | |
) | |
urls = {'http://news.ycombinator.com'} | |
domain = 'ycombinator.com' | |
p.start((urls, set(), domain)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'''Pipeline | |
(yield) -> receiver | |
.send -> producer | |
Provide initial state to producer, avoiding globals. | |
Stop iteration after a bit. | |
Wrap in nice class. | |
Simple crawler to check if "python" was mentioned on HN. | |
Some improvements after: http://www.dabeaz.com/Fcoroutines/Coroutines.pdf | |
- coroutine decorator | |
- catch GeneratorExit | |
''' | |
from contextlib import contextmanager | |
def coroutine(f): | |
def start(*args, **kwargs): | |
cr = f(*args, **kwargs) | |
cr.next() | |
return cr | |
return start | |
@contextmanager | |
def close_on_exit(n): | |
try: | |
yield | |
except GeneratorExit: | |
n.close() | |
class StopPipeline(Exception): | |
pass | |
class Pipeline(object): | |
'''Chain stages together. Assumes the last is the consumer.''' | |
def __init__(self, *args): | |
c = Pipeline.C(args[-1]) | |
t = c | |
for stg in reversed(args[1:-1]): | |
s = Pipeline.S(stg, t) | |
t = s | |
p = Pipeline.P(args[0], t) | |
self._pipeline = p | |
def start(self, initial_state): | |
try: | |
self._pipeline.send(initial_state) | |
except StopIteration: | |
self._pipeline.close() | |
@staticmethod | |
@coroutine | |
def P(f, n): | |
'''Producer: only .send (and yield as entry point).''' | |
state = (yield) # get initial state | |
with close_on_exit(n): | |
while True: | |
try: | |
res, state = f(state) | |
except StopPipeline: | |
return | |
n.send(res) | |
@staticmethod | |
@coroutine | |
def S(f, n): | |
'''Stage: both (yield) and .send.''' | |
with close_on_exit(n): | |
while True: | |
r = (yield) | |
n.send(f(r)) | |
@staticmethod | |
@coroutine | |
def C(f): | |
'''Consumer: only (yield).''' | |
# nothing to "close" here | |
while True: | |
r = (yield) | |
f(r) | |
def produce((urls, visited, domain)): | |
'''Given a state, produce a result and the next state.''' | |
import urllib2 | |
import re | |
nurls = len(urls) | |
if nurls == 0: | |
raise StopPipeline('No more urls') | |
else: | |
print 'Queue %d' % nurls | |
url = urls.pop() | |
doc = urllib2.urlopen(url).read() | |
links = re.compile('href="(http.+?)"').findall(doc) | |
urls.update([l for l in links if domain in l and l not in visited]) | |
visited.add(url) | |
return (url, doc), (urls, visited, domain) | |
def stage((url, doc)): | |
return (url, 'python' in doc.lower()) | |
def consume((url, haskell)): | |
if haskell: | |
print 'Python mentioned in %s' % url | |
if __name__ == '__main__': | |
p = Pipeline( | |
produce, | |
stage, | |
consume, | |
) | |
urls = {'http://news.ycombinator.com'} | |
domain = 'ycombinator.com' | |
p.start((urls, set(), domain)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment