Skip to content

Instantly share code, notes, and snippets.

@phamthuonghai
Created August 18, 2016 04:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save phamthuonghai/fdb73b5738b542e787b91a00172b4e32 to your computer and use it in GitHub Desktop.
Save phamthuonghai/fdb73b5738b542e787b91a00172b4e32 to your computer and use it in GitHub Desktop.
Our very first version of Raptor using Spark
"""
spark-submit pyraptor.py -p "./" -o temp_BYS.csv -a BYS -c sg
"""
import string
from math import sqrt
from optparse import OptionParser
from operator import itemgetter
from pyspark import SparkConf, SparkContext
class Algorithms:
ORIG, BYS, VTD, VSZ = range(4)
def file_to_map(row):
parse_data = string.split(row)
return (parse_data[0], set(parse_data[1:]))
# Wilson score
# input x = (sku1, [sku2, positive, total])
# output o = (sku1, (sku2, score))
def wilson95(x):
p = x[1][1]
n = x[1][2]
if (p <= 0 or n <= 0):
return (x[0], (x[1][0], 0))
p = float(p)
n = float(n)
res = 100 * (p + 1.9208 - 1.96 * sqrt(p - p * p / n + 0.9604)) / (3.8416 + n)
return (x[0], (x[1][0], res))
def levenshtein(s1, s2):
if len(s1) < len(s2):
return levenshtein(s2, s1)
if len(s2) == 0:
return len(s1)
pre = range(len(s2) + 1)
for i, c1 in enumerate(s1):
cur = [i + 1]
for j, c2 in enumerate(s2):
cur.append(min(pre[j + 1] + 1, cur[j] + 1, pre[j] + (c1 != c2))) # insert, delete, substitute
pre = cur
return pre[-1]
# Similar items filtering
# input x = (sku1, (sku2, score))
# output o = (sku1, (sku2, new_score))
def similar_filter_score(x):
lev = levenshtein(x[0], x[1][0])
if (lev < 3):
t = 0.10
elif (lev == 3):
t = 1.00
else:
t = 10.00 # magic number walking down the street
return (x[0], (x[1][0], x[1][1] * t))
# Convert data to wilson params ORIG, VTD algo
# input x = ((sku1, (data, 0)), (sku2, (data, 0)))
# output o = (sku1, [sku2, positive, total])
def data_to_wil(x):
data1 = x[0][1][0]
data2 = x[1][1][0]
pos = len(data1.intersection(data2))
total = len(data1) + len(data2) - pos
return (x[0][0], [x[1][0], pos, total])
# Convert data to wilson params BYS algo
# input x = ((sku1, ((view, purc), 0)), (sku2, ((view, purc), 0)))
# output o = (sku1, [sku2, positive, total])
def bys_to_wil(x):
purc1 = x[0][1][0][1]
purc2 = x[1][1][0][1]
view1 = x[0][1][0][0]
view2 = x[1][1][0][0]
ints_purchase1_view2 = len(purc1.intersection(view2))
ints_view1_purchase2 = len(view1.intersection(purc2))
pos = len(purc1.intersection(purc2))
return (x[0][0], [x[1][0], pos, ints_purchase1_view2 + ints_view1_purchase2])
# Convert data to wilson params VSZ algo
# input x = ((sku1, ((view, vsz), 0)), (sku2, (view, 0)))
# output o = (sku1, [sku2, positive, total])
def vsz_to_wil(x):
data1 = x[0][1][0][0]
data2 = x[1][1][0]
pos = len(data1.intersection(data2))
total = len(data1) + len(data2) - pos
return (x[0][0], [x[1][0], pos, total])
# Filter VSZ algo
# input x = ((sku1, ((view, vsz), 0)), (sku2, (view, 0)))
# output o = True/False
def vsz_filter(x):
return x[1][0] in x[0][1][0][1]
# Filter duplicated pair
# input x = ((sku1, ()), (sku1, ()))
# output o = True/False
def duplicate_filter(x):
return x[0][0] != x[1][0]
# Filter zero pair
# input x = (sku1, (sku2, score))
# output o = True/False
def nonzero_filter(x):
return x[1][1] > 10e-4 # 0 comparision for real value
# input x = (sku1, (sku2, score))
# output o = ((sku1, sku2), score)
def en_pair(x):
return ((x[0], x[1][0]), x[1][1])
# input x = ((sku1, sku2), score)
# output o = (sku1, (sku2, score))
def de_pair(x):
return (x[0][0], (x[0][1], x[1]))
# input x = ((sku1, sku2), (purc, cart))
# output o = (sku1, (sku2, score))
def vtd_combine(x):
if (x[1][0] == None):
purc = 0
else:
purc = x[1][0]
if (x[1][1] == None):
cart = 0
else:
cart = x[1][1]
return (x[0][0], (x[0][1], purc*0.2 + cart))
def apply_jaccard(algo, size, sku_src, sku_dst, purchase_map, view_map, cart_map, visenze_map):
res = {}
if (size <= 0):
return res
sku_src = sku_src.map(lambda x:(x,0))
sku_dst = sku_dst.map(lambda x:(x,0))
if algo == Algorithms.ORIG:
res = purchase_map.join(sku_src) \
.cartesian(purchase_map.join(sku_dst)) \
.filter(duplicate_filter) \
.map(data_to_wil) \
.map(wilson95)
elif algo == Algorithms.BYS:
t = view_map.join(purchase_map) \
.join(sku_dst)
res = view_map.join(purchase_map) \
.join(sku_src) \
.cartesian(t) \
.filter(duplicate_filter) \
.map(bys_to_wil) \
.map(wilson95)
elif algo == Algorithms.VTD:
res1 = cart_map.join(sku_src) \
.cartesian(cart_map.join(sku_dst)) \
.filter(duplicate_filter) \
.map(data_to_wil) \
.map(wilson95) \
.map(en_pair)
res = purchase_map.join(sku_src) \
.cartesian(purchase_map.join(sku_dst)) \
.filter(duplicate_filter) \
.map(data_to_wil) \
.map(wilson95) \
.map(en_pair) \
.fullOuterJoin(res1) \
.map(vtd_combine) \
.filter(similar_filter_score)
else:
t = view_map.join(sku_dst)
res = view_map.leftOuterJoin(visenze_map) \
.join(sku_src) \
.cartesian(t) \
.filter(duplicate_filter) \
.filter(vsz_filter) \
.map(vsz_to_wil) \
.map(wilson95) \
.filter(similar_filter_score)
return res.filter(nonzero_filter).groupByKey()
def main_spark(algo, home, size, country):
conf = (SparkConf()
.setMaster("spark://instance-group-1-8alq:7077")
.setAppName("PyRaptor")
.set("spark.executor.memory", "2g")
.set("spark.driver.memory", "2g")
.set("spark.app.id", "PyRaptor"))
sc = SparkContext(conf = conf)
# input x = (sku, [(sku1, score), (sku2, score),..])
# output o = string
def to_output_line(x):
res = b_country.value + '\t' + x[0]
sorted_value = sorted(x[1], key=itemgetter(1), reverse = True)
for item in sorted_value[0:b_size.value]:
res = res + ( ('\t%.2f-' + item[0]) % item[1] )
return res
sku_src = sc.textFile('file:///home/ubuntu/Recommendation/PyRaptor/Data/instock_skus_' + country + '.csv').map(lambda x:x.strip())
sku_dst = sc.textFile('file:///home/ubuntu/Recommendation/PyRaptor/Data/valid_skus_' + country + '.csv').map(lambda x:x.strip())
# sku_male = set(line.strip() for line in open(home + '/Data/sku_male_' + country + '.csv'))
# sku_female = set(line.strip() for line in open(home + '/Data/sku_female_' + country + '.csv'))
sku_male = sc.textFile('file:///home/ubuntu/Recommendation/PyRaptor/Data/sku_male_' + country + '.csv').map(lambda x:x.split()[0])
sku_female = sc.textFile('file:///home/ubuntu/Recommendation/PyRaptor/Data/sku_female_' + country + '.csv').map(lambda x:x.split()[0])
purchase_map = sc.textFile('file:///home/ubuntu/Recommendation/PyRaptor/Data/VTD_purchased_' + country + '.csv').map(file_to_map)
view_map = sc.textFile('file:///home/ubuntu/Recommendation/PyRaptor/Data/VTD_view_' + country + '.csv').map(file_to_map)
cart_map = sc.textFile('file:///home/ubuntu/Recommendation/PyRaptor/Data/VTD_cart_' + country + '.csv').map(file_to_map)
visenze_map = sc.textFile('file:///home/ubuntu/Recommendation/PyRaptor/Data/visenze_' + country + '.csv').map(file_to_map)
sku_dst_male = sku_male.intersection(sku_dst)
sku_dst_female = sku_female.intersection(sku_dst)
sku_src_male = sku_male.intersection(sku_src)
sku_src_female = sku_female.intersection(sku_src)
tmp_algo = Algorithms.ORIG
if (algo == 'vtd'):
tmp_algo = Algorithms.VTD
elif (algo == 'bayes'):
tmp_algo = Algorithms.BYS
elif (algo == 'visenze'):
tmp_algo = Algorithms.VSZ
list_recommended_male = apply_jaccard(tmp_algo, size, sku_src_male, sku_dst_male, purchase_map, view_map, cart_map, visenze_map)
list_recommended_female = apply_jaccard(tmp_algo, size, sku_src_female, sku_dst_female, purchase_map, view_map, cart_map, visenze_map)
b_country = sc.broadcast(country)
b_size = sc.broadcast(size)
list_recommended_male.union(list_recommended_female) \
.map(to_output_line) \
.saveAsTextFile(home + '/Result/' + algo + '/Raptor_' + country)
sc.stop()
if __name__ == "__main__":
optparser = OptionParser()
optparser.add_option('-p', '--homePath',
dest='home',
help='path to home folder',
default='./Data/Test')
optparser.add_option('-s', '--size',
dest='size',
help='specify number of relevant products, default = 3',
default=3)
optparser.add_option('-a', '--algorithm',
dest='algo',
help='algorithm being used, default = ORIG',
default='original')
optparser.add_option('-c', '--country',
dest='country',
help='country code, default = sg',
default='sg')
(options, args) = optparser.parse_args()
print "Run PyRaptor with option: (" + options.algo, options.home, options.size, options.country + ")"
main_spark(options.algo, options.home, int(options.size), options.country)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment