Skip to content

Instantly share code, notes, and snippets.

@fmarani
Created July 15, 2011 10:42
Show Gist options
  • Save fmarani/1084464 to your computer and use it in GitHub Desktop.
Save fmarani/1084464 to your computer and use it in GitHub Desktop.
Transformation pipeline in Python
#!/usr/bin/env python
import itertools
import random
import string
import functools
# support functions
#Sample implementation of ireduce()
def ireduce(func, iterable, init=None):
if init is None:
iterable = iter(iterable)
curr = iterable.next()
else:
curr = init
for x in iterable:
curr = func(curr, x)
yield curr
# end support functions
# PROTOTYPE FOR A PIPELINE SYSTEM
# where every transformation is applied lazily
# and all stages are active at the same time
# (took inspiration from Hadoop and dataflow languages)
partner_a_products = []
partner_b_products = []
def import_partner_products():
global partner_a_products, partner_b_products
partner_a_load_count = 0
partner_b_load_count = 0
for i in range(1000):
isbn = random.randrange(9780000,9782000)
name = ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(7))
price = random.randrange(1,5)
new_product = (isbn, name, price)
partner_a_load_count += 1
print partner_a_load_count, " PARTNER A NEW -> ", new_product
partner_a_products.append(new_product)
for i in range(2000):
isbn = random.randrange(9780000,9784000)
name = ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(7))
price = random.randrange(1,5)
new_product = (isbn, name, price)
partner_b_load_count += 1
print partner_b_load_count, " PARTNER B NEW -> ", new_product
partner_b_products.append(new_product)
def Acorrelated():
print "correlated.."
def grouper(product):
def accumulate_isbn_equality(acc, product, original_product):
if product[0] == original_product[0]:
return acc + [product]
else:
return acc
b_correlates = ireduce(functools.partial(accumulate_isbn_equality, original_product=product), partner_b_products, [])
try:
b_correlate = []
while len(b_correlate) == 0:
b_correlate = b_correlates.next() # eagerly gets first match
b_correlate = b_correlate[0]
except StopIteration:
b_correlate = None
return (product, b_correlate)
return itertools.imap(grouper, partner_a_products)
def partner_selected():
print "partner_selected.."
def mapper(x):
if x[1] == None:
w = x[0]
elif x[0][2] > x[1][2]:
w = x[1]
else:
w = x[0]
print "best_price_on %s and %s -> %s" % (str(x[0]), str(x[1]), str(w))
return w
return itertools.imap(mapper, Acorrelated())
def filtered():
print "filtered.."
def filter(x):
print "filtering > 2 on", x
return x[2] > 2
return itertools.ifilter(filter, partner_selected())
def with_markup():
print "with_markup.."
def mapper(x):
print "mapping markup addition on", x
return (x[0], x[1], x[2] + 1.2)
return itertools.imap(mapper, filtered())
def normalized():
print "normalized.."
def mapper(x):
print "mapping lowercase on", x
return (x[0], x[1].lower(), x[2])
return itertools.imap(mapper, with_markup())
count = 0
import_partner_products()
for product in normalized():
count += 1
print count, " -> ", product
if count == 1000:
break
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment