Skip to content

Instantly share code, notes, and snippets.

@sgillies
Created September 21, 2011 18:14
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save sgillies/1232852 to your computer and use it in GitHub Desktop.
Save sgillies/1232852 to your computer and use it in GitHub Desktop.
Data processing pipeline demo
# pipelinedemo.py
# Data processing pipeline demo
from json import dumps
import logging
def coroutine(func):
"""Used as a decorator, advances a generator to active it"""
def start(*args,**kwargs):
cr = func(*args,**kwargs)
cr.next()
return cr
return start
def pipeline(source, following):
"""Gets features from an iterator and sends them along"""
for f in source:
logging.info("Pipeline: sending feature to %s", following)
following.send(f)
logging.info("Pipeline: source empty, closing %s", following)
following.close()
@coroutine
def task1(following):
"""Receives a feature, increments its count property and sends it along"""
logging.info("Task1: started...")
try:
while True:
f = (yield)
logging.info("Task1: increment count")
count = f.setdefault('properties', {}).setdefault('count', 0)
count += 1
f['properties']['count'] = count
logging.info("Task1: sending feature to %s", following)
following.send(f)
except GeneratorExit:
logging.info("Task1: stopped.")
@coroutine
def task2(following):
"""Receives a feature, increments its count property and sends it along"""
logging.info("Task2: started...")
try:
while True:
f = (yield)
logging.info("Task2: increment count")
count = f.setdefault('properties', {}).setdefault('count', 0)
count += 1
f['properties']['count'] = count
logging.info("Task2: sending feature to %s", following)
following.send(f)
except GeneratorExit:
logging.info("Task2: stopped.")
@coroutine
def writer(fileobj):
"""A sink for features"""
buffer = []
try:
logging.info("Writer: started...")
while True:
f = (yield)
buffer.append(f)
except GeneratorExit:
logging.info("Writer: dumping buffer")
fileobj.write(dumps({'features': buffer}, indent=2) + "\n")
fileobj.close()
logging.info("Writer: stopped.")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
features = [
{'id': "1"}, {'id': "2"}, {'id': "3"}]
pipeline(
features,
task1(
task2(
writer(open("pipeline-demo.json", "w")))
)
)
print open("pipeline-demo.json").read()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment