Skip to content

Instantly share code, notes, and snippets.

@lbolla
Created April 20, 2012 12:34
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lbolla/2428213 to your computer and use it in GitHub Desktop.
Save lbolla/2428213 to your computer and use it in GitHub Desktop.
Python pipelines
'''Pipeline
(yield) -> receiver
.send -> producer
'''
import time
N = 0
def P(n):
'''Producer: only .send (and yield as entry point).'''
def _f():
global N
N += 1
time.sleep(1)
return N
yield # to "start"
while True:
n.send(_f())
def S(n):
'''Stage: both (yield) and .send.'''
def _f(x):
print 'Stage', x
return x + 1
while True:
r = (yield)
n.send(_f(r))
def C():
'''Consumer: only (yield).'''
def _f(x):
print 'Consumed', x
while True:
r = (yield)
_f(r)
def pipeline(*args):
'''Chain stages together. Assumes the last is the consumer.'''
c = args[-1]()
c.next()
t = c
for S in reversed(args[:-1]):
s = S(t)
s.next()
t = s
return t
if __name__ == '__main__':
p = pipeline(P, S, S, S, C)
p.next() # to "start"
'''Pipeline
(yield) -> receiver
.send -> producer
Provide initial state to producer, avoiding globals.
'''
import time
from functools import partial
def P(f, n):
'''Producer: only .send (and yield as entry point).'''
state = (yield) # get initial state
while True:
res, state = f(state)
n.send(res)
def S(f, n):
'''Stage: both (yield) and .send.'''
while True:
r = (yield)
n.send(f(r))
def C(f):
'''Consumer: only (yield).'''
while True:
r = (yield)
f(r)
def pipeline(*args):
'''Chain stages together. Assumes the last is the consumer.'''
c = args[-1]()
c.next()
t = c
for S in reversed(args[:-1]):
s = S(t)
s.next()
t = s
return t
def produce(state):
'''Given a state, produce a result and the next state.'''
time.sleep(1)
return state, state + 1
def stage(x):
print 'Stage', x
return x + 1
def consume(x):
print 'Consumed', x
if __name__ == '__main__':
p = pipeline(
partial(P, produce),
partial(S, stage),
partial(S, stage),
partial(S, stage),
partial(C, consume),
)
initial_state = 0
p.send(initial_state)
'''Pipeline
(yield) -> receiver
.send -> producer
Provide initial state to producer, avoiding globals.
Stop iteration after a bit.
Wrap in nice class.
'''
class StopPipeline(Exception):
pass
class Pipeline(object):
'''Chain stages together. Assumes the last is the consumer.'''
def __init__(self, *args):
c = Pipeline.C(args[-1])
c.next()
t = c
for stg in reversed(args[1:-1]):
s = Pipeline.S(stg, t)
s.next()
t = s
p = Pipeline.P(args[0], t)
p.next()
self._pipeline = p
def start(self, initial_state):
try:
self._pipeline.send(initial_state)
except StopIteration:
self._pipeline.close()
@staticmethod
def P(f, n):
'''Producer: only .send (and yield as entry point).'''
state = (yield) # get initial state
while True:
try:
res, state = f(state)
except StopPipeline:
return
n.send(res)
@staticmethod
def S(f, n):
'''Stage: both (yield) and .send.'''
while True:
r = (yield)
n.send(f(r))
@staticmethod
def C(f):
'''Consumer: only (yield).'''
while True:
r = (yield)
f(r)
def produce(state):
'''Given a state, produce a result and the next state.'''
import time
if state == 3:
raise StopPipeline('Enough!')
time.sleep(1)
return state, state + 1
def stage(x):
print 'Stage', x
return x + 1
def consume(x):
print 'Consumed', x
if __name__ == '__main__':
p = Pipeline(
produce,
stage,
stage,
stage,
consume,
)
initial_state = 0
p.start(initial_state)
'''Pipeline
(yield) -> receiver
.send -> producer
Provide initial state to producer, avoiding globals.
Stop iteration after a bit.
Wrap in nice class.
Simple crawler to check if "python" was mentioned on HN.
'''
class StopPipeline(Exception):
pass
class Pipeline(object):
'''Chain stages together. Assumes the last is the consumer.'''
def __init__(self, *args):
c = Pipeline.C(args[-1])
c.next()
t = c
for stg in reversed(args[1:-1]):
s = Pipeline.S(stg, t)
s.next()
t = s
p = Pipeline.P(args[0], t)
p.next()
self._pipeline = p
def start(self, initial_state):
try:
self._pipeline.send(initial_state)
except StopIteration:
self._pipeline.close()
@staticmethod
def P(f, n):
'''Producer: only .send (and yield as entry point).'''
state = (yield) # get initial state
while True:
try:
res, state = f(state)
except StopPipeline:
return
n.send(res)
@staticmethod
def S(f, n):
'''Stage: both (yield) and .send.'''
while True:
r = (yield)
n.send(f(r))
@staticmethod
def C(f):
'''Consumer: only (yield).'''
while True:
r = (yield)
f(r)
def produce((urls, visited, domain)):
'''Given a state, produce a result and the next state.'''
import urllib2
import re
nurls = len(urls)
if nurls == 0:
raise StopPipeline('No more urls')
else:
print 'Queue %d' % nurls
url = urls.pop()
doc = urllib2.urlopen(url).read()
links = re.compile('href="(http.+?)"').findall(doc)
urls.update([l for l in links if domain in l and l not in visited])
visited.add(url)
return (url, doc), (urls, visited, domain)
def stage((url, doc)):
return (url, 'python' in doc.lower())
def consume((url, haskell)):
if haskell:
print 'Python mentioned in %s' % url
if __name__ == '__main__':
p = Pipeline(
produce,
stage,
consume,
)
urls = {'http://news.ycombinator.com'}
domain = 'ycombinator.com'
p.start((urls, set(), domain))
'''Pipeline
(yield) -> receiver
.send -> producer
Provide initial state to producer, avoiding globals.
Stop iteration after a bit.
Wrap in nice class.
Simple crawler to check if "python" was mentioned on HN.
Some improvements after: http://www.dabeaz.com/Fcoroutines/Coroutines.pdf
- coroutine decorator
- catch GeneratorExit
'''
from contextlib import contextmanager
def coroutine(f):
def start(*args, **kwargs):
cr = f(*args, **kwargs)
cr.next()
return cr
return start
@contextmanager
def close_on_exit(n):
try:
yield
except GeneratorExit:
n.close()
class StopPipeline(Exception):
pass
class Pipeline(object):
'''Chain stages together. Assumes the last is the consumer.'''
def __init__(self, *args):
c = Pipeline.C(args[-1])
t = c
for stg in reversed(args[1:-1]):
s = Pipeline.S(stg, t)
t = s
p = Pipeline.P(args[0], t)
self._pipeline = p
def start(self, initial_state):
try:
self._pipeline.send(initial_state)
except StopIteration:
self._pipeline.close()
@staticmethod
@coroutine
def P(f, n):
'''Producer: only .send (and yield as entry point).'''
state = (yield) # get initial state
with close_on_exit(n):
while True:
try:
res, state = f(state)
except StopPipeline:
return
n.send(res)
@staticmethod
@coroutine
def S(f, n):
'''Stage: both (yield) and .send.'''
with close_on_exit(n):
while True:
r = (yield)
n.send(f(r))
@staticmethod
@coroutine
def C(f):
'''Consumer: only (yield).'''
# nothing to "close" here
while True:
r = (yield)
f(r)
def produce((urls, visited, domain)):
'''Given a state, produce a result and the next state.'''
import urllib2
import re
nurls = len(urls)
if nurls == 0:
raise StopPipeline('No more urls')
else:
print 'Queue %d' % nurls
url = urls.pop()
doc = urllib2.urlopen(url).read()
links = re.compile('href="(http.+?)"').findall(doc)
urls.update([l for l in links if domain in l and l not in visited])
visited.add(url)
return (url, doc), (urls, visited, domain)
def stage((url, doc)):
return (url, 'python' in doc.lower())
def consume((url, haskell)):
if haskell:
print 'Python mentioned in %s' % url
if __name__ == '__main__':
p = Pipeline(
produce,
stage,
consume,
)
urls = {'http://news.ycombinator.com'}
domain = 'ycombinator.com'
p.start((urls, set(), domain))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment