Created
August 18, 2016 04:08
-
-
Save phamthuonghai/fdb73b5738b542e787b91a00172b4e32 to your computer and use it in GitHub Desktop.
Our very first version of Raptor using Spark
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
spark-submit pyraptor.py -p "./" -o temp_BYS.csv -a BYS -c sg | |
""" | |
import string | |
from math import sqrt | |
from optparse import OptionParser | |
from operator import itemgetter | |
from pyspark import SparkConf, SparkContext | |
class Algorithms: | |
ORIG, BYS, VTD, VSZ = range(4) | |
def file_to_map(row): | |
parse_data = string.split(row) | |
return (parse_data[0], set(parse_data[1:])) | |
# Wilson score | |
# input x = (sku1, [sku2, positive, total]) | |
# output o = (sku1, (sku2, score)) | |
def wilson95(x): | |
p = x[1][1] | |
n = x[1][2] | |
if (p <= 0 or n <= 0): | |
return (x[0], (x[1][0], 0)) | |
p = float(p) | |
n = float(n) | |
res = 100 * (p + 1.9208 - 1.96 * sqrt(p - p * p / n + 0.9604)) / (3.8416 + n) | |
return (x[0], (x[1][0], res)) | |
def levenshtein(s1, s2): | |
if len(s1) < len(s2): | |
return levenshtein(s2, s1) | |
if len(s2) == 0: | |
return len(s1) | |
pre = range(len(s2) + 1) | |
for i, c1 in enumerate(s1): | |
cur = [i + 1] | |
for j, c2 in enumerate(s2): | |
cur.append(min(pre[j + 1] + 1, cur[j] + 1, pre[j] + (c1 != c2))) # insert, delete, substitute | |
pre = cur | |
return pre[-1] | |
# Similar items filtering | |
# input x = (sku1, (sku2, score)) | |
# output o = (sku1, (sku2, new_score)) | |
def similar_filter_score(x): | |
lev = levenshtein(x[0], x[1][0]) | |
if (lev < 3): | |
t = 0.10 | |
elif (lev == 3): | |
t = 1.00 | |
else: | |
t = 10.00 # magic number walking down the street | |
return (x[0], (x[1][0], x[1][1] * t)) | |
# Convert data to wilson params ORIG, VTD algo | |
# input x = ((sku1, (data, 0)), (sku2, (data, 0))) | |
# output o = (sku1, [sku2, positive, total]) | |
def data_to_wil(x): | |
data1 = x[0][1][0] | |
data2 = x[1][1][0] | |
pos = len(data1.intersection(data2)) | |
total = len(data1) + len(data2) - pos | |
return (x[0][0], [x[1][0], pos, total]) | |
# Convert data to wilson params BYS algo | |
# input x = ((sku1, ((view, purc), 0)), (sku2, ((view, purc), 0))) | |
# output o = (sku1, [sku2, positive, total]) | |
def bys_to_wil(x): | |
purc1 = x[0][1][0][1] | |
purc2 = x[1][1][0][1] | |
view1 = x[0][1][0][0] | |
view2 = x[1][1][0][0] | |
ints_purchase1_view2 = len(purc1.intersection(view2)) | |
ints_view1_purchase2 = len(view1.intersection(purc2)) | |
pos = len(purc1.intersection(purc2)) | |
return (x[0][0], [x[1][0], pos, ints_purchase1_view2 + ints_view1_purchase2]) | |
# Convert data to wilson params VSZ algo | |
# input x = ((sku1, ((view, vsz), 0)), (sku2, (view, 0))) | |
# output o = (sku1, [sku2, positive, total]) | |
def vsz_to_wil(x): | |
data1 = x[0][1][0][0] | |
data2 = x[1][1][0] | |
pos = len(data1.intersection(data2)) | |
total = len(data1) + len(data2) - pos | |
return (x[0][0], [x[1][0], pos, total]) | |
# Filter VSZ algo | |
# input x = ((sku1, ((view, vsz), 0)), (sku2, (view, 0))) | |
# output o = True/False | |
def vsz_filter(x): | |
return x[1][0] in x[0][1][0][1] | |
# Filter duplicated pair | |
# input x = ((sku1, ()), (sku1, ())) | |
# output o = True/False | |
def duplicate_filter(x): | |
return x[0][0] != x[1][0] | |
# Filter zero pair | |
# input x = (sku1, (sku2, score)) | |
# output o = True/False | |
def nonzero_filter(x): | |
return x[1][1] > 10e-4 # 0 comparision for real value | |
# input x = (sku1, (sku2, score)) | |
# output o = ((sku1, sku2), score) | |
def en_pair(x): | |
return ((x[0], x[1][0]), x[1][1]) | |
# input x = ((sku1, sku2), score) | |
# output o = (sku1, (sku2, score)) | |
def de_pair(x): | |
return (x[0][0], (x[0][1], x[1])) | |
# input x = ((sku1, sku2), (purc, cart)) | |
# output o = (sku1, (sku2, score)) | |
def vtd_combine(x): | |
if (x[1][0] == None): | |
purc = 0 | |
else: | |
purc = x[1][0] | |
if (x[1][1] == None): | |
cart = 0 | |
else: | |
cart = x[1][1] | |
return (x[0][0], (x[0][1], purc*0.2 + cart)) | |
def apply_jaccard(algo, size, sku_src, sku_dst, purchase_map, view_map, cart_map, visenze_map): | |
res = {} | |
if (size <= 0): | |
return res | |
sku_src = sku_src.map(lambda x:(x,0)) | |
sku_dst = sku_dst.map(lambda x:(x,0)) | |
if algo == Algorithms.ORIG: | |
res = purchase_map.join(sku_src) \ | |
.cartesian(purchase_map.join(sku_dst)) \ | |
.filter(duplicate_filter) \ | |
.map(data_to_wil) \ | |
.map(wilson95) | |
elif algo == Algorithms.BYS: | |
t = view_map.join(purchase_map) \ | |
.join(sku_dst) | |
res = view_map.join(purchase_map) \ | |
.join(sku_src) \ | |
.cartesian(t) \ | |
.filter(duplicate_filter) \ | |
.map(bys_to_wil) \ | |
.map(wilson95) | |
elif algo == Algorithms.VTD: | |
res1 = cart_map.join(sku_src) \ | |
.cartesian(cart_map.join(sku_dst)) \ | |
.filter(duplicate_filter) \ | |
.map(data_to_wil) \ | |
.map(wilson95) \ | |
.map(en_pair) | |
res = purchase_map.join(sku_src) \ | |
.cartesian(purchase_map.join(sku_dst)) \ | |
.filter(duplicate_filter) \ | |
.map(data_to_wil) \ | |
.map(wilson95) \ | |
.map(en_pair) \ | |
.fullOuterJoin(res1) \ | |
.map(vtd_combine) \ | |
.filter(similar_filter_score) | |
else: | |
t = view_map.join(sku_dst) | |
res = view_map.leftOuterJoin(visenze_map) \ | |
.join(sku_src) \ | |
.cartesian(t) \ | |
.filter(duplicate_filter) \ | |
.filter(vsz_filter) \ | |
.map(vsz_to_wil) \ | |
.map(wilson95) \ | |
.filter(similar_filter_score) | |
return res.filter(nonzero_filter).groupByKey() | |
def main_spark(algo, home, size, country): | |
conf = (SparkConf() | |
.setMaster("spark://instance-group-1-8alq:7077") | |
.setAppName("PyRaptor") | |
.set("spark.executor.memory", "2g") | |
.set("spark.driver.memory", "2g") | |
.set("spark.app.id", "PyRaptor")) | |
sc = SparkContext(conf = conf) | |
# input x = (sku, [(sku1, score), (sku2, score),..]) | |
# output o = string | |
def to_output_line(x): | |
res = b_country.value + '\t' + x[0] | |
sorted_value = sorted(x[1], key=itemgetter(1), reverse = True) | |
for item in sorted_value[0:b_size.value]: | |
res = res + ( ('\t%.2f-' + item[0]) % item[1] ) | |
return res | |
sku_src = sc.textFile('file:///home/ubuntu/Recommendation/PyRaptor/Data/instock_skus_' + country + '.csv').map(lambda x:x.strip()) | |
sku_dst = sc.textFile('file:///home/ubuntu/Recommendation/PyRaptor/Data/valid_skus_' + country + '.csv').map(lambda x:x.strip()) | |
# sku_male = set(line.strip() for line in open(home + '/Data/sku_male_' + country + '.csv')) | |
# sku_female = set(line.strip() for line in open(home + '/Data/sku_female_' + country + '.csv')) | |
sku_male = sc.textFile('file:///home/ubuntu/Recommendation/PyRaptor/Data/sku_male_' + country + '.csv').map(lambda x:x.split()[0]) | |
sku_female = sc.textFile('file:///home/ubuntu/Recommendation/PyRaptor/Data/sku_female_' + country + '.csv').map(lambda x:x.split()[0]) | |
purchase_map = sc.textFile('file:///home/ubuntu/Recommendation/PyRaptor/Data/VTD_purchased_' + country + '.csv').map(file_to_map) | |
view_map = sc.textFile('file:///home/ubuntu/Recommendation/PyRaptor/Data/VTD_view_' + country + '.csv').map(file_to_map) | |
cart_map = sc.textFile('file:///home/ubuntu/Recommendation/PyRaptor/Data/VTD_cart_' + country + '.csv').map(file_to_map) | |
visenze_map = sc.textFile('file:///home/ubuntu/Recommendation/PyRaptor/Data/visenze_' + country + '.csv').map(file_to_map) | |
sku_dst_male = sku_male.intersection(sku_dst) | |
sku_dst_female = sku_female.intersection(sku_dst) | |
sku_src_male = sku_male.intersection(sku_src) | |
sku_src_female = sku_female.intersection(sku_src) | |
tmp_algo = Algorithms.ORIG | |
if (algo == 'vtd'): | |
tmp_algo = Algorithms.VTD | |
elif (algo == 'bayes'): | |
tmp_algo = Algorithms.BYS | |
elif (algo == 'visenze'): | |
tmp_algo = Algorithms.VSZ | |
list_recommended_male = apply_jaccard(tmp_algo, size, sku_src_male, sku_dst_male, purchase_map, view_map, cart_map, visenze_map) | |
list_recommended_female = apply_jaccard(tmp_algo, size, sku_src_female, sku_dst_female, purchase_map, view_map, cart_map, visenze_map) | |
b_country = sc.broadcast(country) | |
b_size = sc.broadcast(size) | |
list_recommended_male.union(list_recommended_female) \ | |
.map(to_output_line) \ | |
.saveAsTextFile(home + '/Result/' + algo + '/Raptor_' + country) | |
sc.stop() | |
if __name__ == "__main__": | |
optparser = OptionParser() | |
optparser.add_option('-p', '--homePath', | |
dest='home', | |
help='path to home folder', | |
default='./Data/Test') | |
optparser.add_option('-s', '--size', | |
dest='size', | |
help='specify number of relevant products, default = 3', | |
default=3) | |
optparser.add_option('-a', '--algorithm', | |
dest='algo', | |
help='algorithm being used, default = ORIG', | |
default='original') | |
optparser.add_option('-c', '--country', | |
dest='country', | |
help='country code, default = sg', | |
default='sg') | |
(options, args) = optparser.parse_args() | |
print "Run PyRaptor with option: (" + options.algo, options.home, options.size, options.country + ")" | |
main_spark(options.algo, options.home, int(options.size), options.country) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment