Created
September 21, 2011 18:14
-
-
Save sgillies/1232852 to your computer and use it in GitHub Desktop.
Data processing pipeline demo
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# pipelinedemo.py | |
# Data processing pipeline demo | |
from json import dumps | |
import logging | |
def coroutine(func): | |
"""Used as a decorator, advances a generator to active it""" | |
def start(*args,**kwargs): | |
cr = func(*args,**kwargs) | |
cr.next() | |
return cr | |
return start | |
def pipeline(source, following): | |
"""Gets features from an iterator and sends them along""" | |
for f in source: | |
logging.info("Pipeline: sending feature to %s", following) | |
following.send(f) | |
logging.info("Pipeline: source empty, closing %s", following) | |
following.close() | |
@coroutine | |
def task1(following): | |
"""Receives a feature, increments its count property and sends it along""" | |
logging.info("Task1: started...") | |
try: | |
while True: | |
f = (yield) | |
logging.info("Task1: increment count") | |
count = f.setdefault('properties', {}).setdefault('count', 0) | |
count += 1 | |
f['properties']['count'] = count | |
logging.info("Task1: sending feature to %s", following) | |
following.send(f) | |
except GeneratorExit: | |
logging.info("Task1: stopped.") | |
@coroutine | |
def task2(following): | |
"""Receives a feature, increments its count property and sends it along""" | |
logging.info("Task2: started...") | |
try: | |
while True: | |
f = (yield) | |
logging.info("Task2: increment count") | |
count = f.setdefault('properties', {}).setdefault('count', 0) | |
count += 1 | |
f['properties']['count'] = count | |
logging.info("Task2: sending feature to %s", following) | |
following.send(f) | |
except GeneratorExit: | |
logging.info("Task2: stopped.") | |
@coroutine | |
def writer(fileobj): | |
"""A sink for features""" | |
buffer = [] | |
try: | |
logging.info("Writer: started...") | |
while True: | |
f = (yield) | |
buffer.append(f) | |
except GeneratorExit: | |
logging.info("Writer: dumping buffer") | |
fileobj.write(dumps({'features': buffer}, indent=2) + "\n") | |
fileobj.close() | |
logging.info("Writer: stopped.") | |
if __name__ == "__main__": | |
logging.basicConfig(level=logging.INFO) | |
features = [ | |
{'id': "1"}, {'id': "2"}, {'id': "3"}] | |
pipeline( | |
features, | |
task1( | |
task2( | |
writer(open("pipeline-demo.json", "w"))) | |
) | |
) | |
print open("pipeline-demo.json").read() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment