# coding: utf-8
# In[1]:
import numpy as np
import math
from pyspark.sql import SQLContext, Row
from moztelemetry import get_pings, get_pings_properties, get_one_ping_per_client, get_clients_history, get_records
import numpy as np
import matplotlib.mlab as mlab
import matplotlib.pyplot as plt
import scipy.stats
import scipy.optimize
from scipy.stats.stats import pearsonr
import scipy.spatial.distance as spdist
import urllib2, json
get_ipython().magic(u'pylab inline')
matplotlib.rcParams['figure.figsize'] = (12.0, 6.0)
# In[2]:
RAND_SEED = 422950748
ADDON_EXCLUSION_LIST = ['', '', '', '']
db = [0.00, 0.13, 0.28]
lb = [0.00, 0.59, 0.87]
lo = [1.00, 0.58, 0.00]
do = [0.90, 0.38, 0.00]
# In[66]:
freshPings_addon_DF = sqlContext.sql("SELECT * FROM longitudinal")\
 .where("active_addons IS NOT null and size(active_addons[0]) > 0")\
 .where("size(active_addons) > 1")\
 .where("size(active_addons) < 999")\
 .where("build IS NOT null AND build[0].application_name = 'Firefox'")\
 .selectExpr("client_id as client_id", "active_addons[0] as active_addons")

# .where("size(active_addons) > 1")\
# this is a temporayr hack to get aroudn the telemetry bug!!!! and should be removed when the issue is resolved.

freshPings_cat_DF = sqlContext.sql("SELECT * FROM longitudinal")\
 .where("active_addons IS NOT null and size(active_addons[0]) > 0")\
 .where("size(active_addons) > 1")\
 .where("build IS NOT null AND build[0].application_name = 'Firefox'")\
 .selectExpr("client_id as client_id", "geo_country[0] as geo_country",\
 "os as os",\
 "system[0].memory_mb as sys_mem",\
 "system[0].virtual_max_mb as virt_mem",\
 "theme[0].id as theme",\
 #"flash_version[0] as flash",\
 "system_gfx[0].monitors.screen_width as scrnWd",\
 #"number_of_profiles as prof_num",\
 "settings[0].e10s_enabled as e10_bool",\
 "settings[0].telemetry_enabled as telemetry_bool",\
 "settings[0] as default_search",\
 "settings[0].locale as loc",\
 "settings[0] as channel"\
 )

# Thanks to Alessio, for being a BOSS

"""
freshPings_con_DF = sqlContext.sql("SELECT * FROM longitudinal")\
 .where("active_addons IS NOT null")\
 .where("build IS NOT null AND build[0].application_name = 'Firefox'")\
 .selectExpr("client_id as client_id", "session_length[0] as session_length",\
 "paint_build_displaylist_time[0] as paint_time", "predictor_wait_time[0] as predictor_wait_time",\
 "html_background_reflow_ms_2[0] as reflow_ms", "telemetry_memory_reporter_ms[0] as tele_rep_time",\
 "image_decode_latency_us[0] as im_decode_latency", "http_subitem_open_latency_time[0] as op_latent"\
 )
"""

subset_addon_rdd = freshPings_addon_DF.sample(False, 0.1, RAND_SEED).rdd.cache()
subset_addon_tok_rdd = p: (p['client_id'], p['active_addons'].keys())).cache()

# to filter by client_id
def isinSet(p):
 return p in valid_ID_in_addon_Set # this must be broadcast

def filterSystemAddons(p, ADDON_EXCLUSION_LIST):
 return [x for x in p if x not in ADDON_EXCLUSION_LIST]
 
subset_addon_tok_filtered_rdd = p: (p[0], filterSystemAddons(p[1], ADDON_EXCLUSION_LIST))).filter(lambda p: len(p[1])>1).cache()

valid_ID_in_addon_Set = set( p: p[0]).collect())#set of valid target ids that were randomly chosen for addons
sc.broadcast(valid_ID_in_addon_Set)

subset_cat_rdd = freshPings_cat_DF.rdd.filter(lambda p: isinSet(p[0])).cache()
# In[67]:
# In[68]:
print subset_cat_rdd.count()
print subset_addon_tok_filtered_rdd.count()
# In[186]:
# implement some standard nlp functions to work with addons

def addon_frequency(add_on_list):
 # we can cheat a bit here becasue we know the tf (token frequency is always 1/number_of_elements)
 addict = dict()
 l = len(add_on_list)
 for i in add_on_list:
 addict[i] = float(1)/l # quack quack!
 return addict # the dictionary of addon names (keys) to frequency (value)

def population_frequency(corpusRDD):
 total_pings = corpusRDD.count()
 addons_intf = corpusRDD.flatMap(lambda p: list(set(p[1]))) # in principal the <list(set())> is unnecessary but lets be safe!
 addonCountPairRDD = p: (p,1)) # creating a pairRDD with integer 1's all over the values
 addonSumPairRDD = addonCountPairRDD.reduceByKey(add)
 population_frequency_addons = p: (p[0], float(total_pings)/p[1])) #dividing the lexicon count by the frequency
 return population_frequency_addons

def addonRarity(addons_freq): 
 # corpusIDFsBroadcast MUST be a broadcast variable or everything breaks down!
 # it must be a dict of {addon_name, frequency})
 addon_dict_intf = dict()
 
 t = float(1)/len(addons_freq)#if the possibility of duplicates exists then: <float(1)> becomes <float(addons_frequency.count(i))/len(addons_frequency) and goes inside the loop
 for i in addons_freq:
 addon_dict_intf[i] = t*corpus_addons_IDFs_Broadcast.get(i,1)
 return addon_dict_intf

def catVarRarity(cat_freq): 
 # corpusIDFsBroadcast MUST be a broadcast variable or everything breaks down!
 # it must be a dict of {addon_name, frequency})
 dict_intf = dict()
 
 t = float(1)/len(addons_freq)#if the possibility of duplicates exists then: <float(1)> becomes <float(addons_frequency.count(i))/len(addons_frequency) and goes inside the loop
 for i in cat_freq:
 dict_intf[i] = t*corpusCatIDFsBroadcast.get(i,1)
 return addon_dict_intf

def convertKeyCombinedIDs(p):# need to combine the keys ping_ids into a concatenated string for the comparison 
 # this will allow quick exploration of the 
 source_1_id = str(p[0][0])
 source_2_id = str(p[0][1])
 similarity_score = p[1]
 return (source_1_id + ' ' + source_2_id, similarity_score)
 
def count_entities(p):
 tally_counter = q: len(q[1])).reduce(add)
 return tally_counter

def exclude_empty_entities(p):
 return bool(p[1])#how pythonic 

def addonID2addonName(a):
 return addonIdsToNames_broadcast[a]

def tokenize_catVar(p):
 names = p.asDict()
 del names['client_id']
 cat_list = []
 for i in names:
 staging_string = safe_unicode(i) + '_' + safe_unicode(names[i])
 cat_list.append(staging_string)
 return (p[0], cat_list)

def dotprod(x1,x2):
 idx = list(x1.viewkeys() & x2.viewkeys())
 dotProd = 0
 for i in idx:
 dotProd += x1.get(i,1)*x2.get(i,1)
 return dotProd

def norm(x):
 normVal = 0
 idx = list(x.viewkeys())
 for i in idx:
 normVal += (x.get(i,1)*x.get(i,1))
 return math.sqrt(normVal)

def cosSim(x1,x2):
 return dotprod(x1,x2)/norm(x1)/norm(x2)

def cosSimHelper(list_1,list_2):
 w1 = addonRarity(addon_frequency(list_1))
 w2 = addonRarity(addon_frequency(list_2))
 return cosSim(w1,w2) 

def computeCosSim(pair_record):
 #requires the corpusIDFs to be broadcast!
 source_1 = pair_record[0]
 source_2 = pair_record[1]
 ping_id_1 = source_1[0]
 ping_id_2 = source_2[0]
 value_1 = source_1[1]
 value_2 = source_2[1]
 cs = cosSimHelper(value_1,value_2)
 return ((ping_id_1, ping_id_2),cs)

def computeOtherSim(pair_record):
 #requires the corpusIDFs to be broadcast!
 source_1 = pair_record[0]
 source_2 = pair_record[1]
 ping_id_1 = source_1[0]
 ping_id_2 = source_2[0]
 value_1 = source_1[1]
 value_2 = source_2[1]
 cs = otherSimHelper(value_1,value_2)
 return ((ping_id_1, ping_id_2),cs)

def otherSimHelper(list_1,list_2):
 w1 = catVarRarity(addon_frequency(list_1))
 w2 = catVarRarity(addon_frequency(list_2))
 return spdist.correlation(w1.values(), w2.values()) 

def generate_nonCartesian_pairWiseComparison_RDD(rddq):
 rdd1,rdd2 = rddq.randomSplit([0.5,0.5], RAND_SEED)
 rdd1b = rdd1.zipWithIndex().map(lambda p: (p[1],p[0]))
 rdd2b = rdd2.zipWithIndex().map(lambda p: (p[1],p[0]))
 rddc = rdd1b.join(rdd2b).map(lambda p: p[1])
 return rddc

def safe_unicode(obj, *args):
 try:
 return str(obj, *args)
 except UnicodeEncodeError: 
 return unicode('ascii_text_fail_sauce')

def catVarRarity(addons_freq): 
 # corpusIDFsBroadcast MUST be a broadcast variable or everything breaks down!
 # it must be a dict of {addon_name, frequency})
 addon_dict_intf = dict()
 
 t = float(1)/len(addons_freq)#if the possibility of duplicates exists then: <float(1)> becomes <float(addons_frequency.count(i))/len(addons_frequency) and goes inside the loop
 for i in addons_freq:
 addon_dict_intf[i] = t*corpusCatIDFsBroadcast.get(i,1)
 return addon_dict_intf
# In[150]:
recCNT = subset_addon_tok_filtered_rdd.count()
addonCNT = count_entities(subset_addon_tok_filtered_rdd)

print str(recCNT) + ' records parsed including ' + str(addonCNT) +' addon installations'
# ### We will borrow many concepts from NLP here. but we have it easy, becasue our lexicon is only the total number of possible add-ons, so calculating IDF (inverse document frequency) and TF (token frequency) we can convert our add-ons list to a numerical vector representing each addon list as a vector of numerical weights favouring rare add-ons (in pairwise comparisons between users).
# ### This will take a long time! we use the superset that our small sample was drawn from to ensure that the set esclusion is empty. Then calculate the frequency of addons globally within the freshest part of the longitudinal corpus, once again excluding pings wiht no add-ons for now.
# In[151]:
corpusTokenizedRDD = p: (p['client_id'], p['active_addons'].keys()))
corpusIDFs = population_frequency(corpusTokenizedRDD)
## DISCUSSION POINT: should we include the subsample in the corpus:
# option 1: no, but then filter out addons we have not seen yet in the corpus, from the sample
# option 2: yes leave the corpus as the super set and risk overfitting
corpus_addons_IDFs_Broadcast = corpusIDFs.collectAsMap()
sc.broadcast(corpus_addons_IDFs_Broadcast) # this is really important, otherwise we are sending this to each worker FOR EACH partition!!!

tokenized_catVars_corpus = p: tokenize_catVar(p)).filter(lambda p: exclude_empty_entities(p)).cache()
#cnt = tokenized_catVars_corpus.count()
corpusCatIDFs = population_frequency(tokenized_catVars_corpus) # idfs of the keys in the pair rdd
corpusCatIDFsBroadcast = corpusCatIDFs.collectAsMap() 
sc.broadcast(corpusCatIDFsBroadcast)#broadcast the idfs (this should be small)
# In[152]:
vals_idf = corpusIDFs.collect() # maybe we need to subsample!
temp = []
for i in vals_idf:
 temp.append(i[1]) #store it on the client machine (locally) as a list
# plot with plotly 
n, bins, patches = plt.hist(temp, 150, normed=1, facecolor=db, alpha=0.85)
plt.xlabel('document TF-IDF values observed \n <- occurs in most users (less informative) | occurs rarely relative to corpus (more informative) ->')
plt.ylabel('Frequency (normalized)')
plt.title('token frequency for the entire addons corpus')
plt.ylim(0, 0.000005)
plt.xlim(0,5e6)
# In[153]:
# here is a nice place to exmaine the most and least frequent addons
mostFrequent_addons = corpusIDFs.takeOrdered(20, lambda p: p[1])
print 'Most frequently installed add-ons (system addons not filtered): ' + str(mostFrequent_addons) + '\n'

leastFrequent_addons = corpusIDFs.takeOrdered(20, lambda p: -1*p[1])
print 'Least popular installed add-ons: ' + str(leastFrequent_addons)
# ### Low popularity (rare) addons may still be very useful in recomending addons for some users. I.e. if two users have a similar profile in terms of rare characteristics, that is weighted more heavily than if they are similar in terms of very common characteristics.
# ### OK, now we need to compare pairwise similarity between addon weight vector
# In[154]:
crossSmall_addons = generate_nonCartesian_pairWiseComparison_RDD(subset_addon_tok_filtered_rdd)
print crossSmall_addons.count()
# In[155]:
sims_addon = p: computeCosSim(p)).cache()
print sims_addon.take(10)
# In[156]:
vals_css = sims_addon.collect() # maybe we need to subsample!
temp2 = []
for i in vals_css:
if i[1] > 0:
temp2.append(i[1]) #store it on the client machine (locally) as a list
n, bins, patches = plt.hist(temp2, 150, normed=1, facecolor=do, alpha=0.85)
plt.xlabel('Pairwise cosine similarity in addons features')
plt.ylabel('Frequency (normalized)')
plt.title('Distribution in pairwise similarity in addons sense for 5% sample of longitudinal (excluding 0 vals)')
plt.ylim(0, 20)
# ### it may be interesting to normalize these values by the number of add-ons in the comparison!
# # OK, lets try to relate the pairwise addon similarities to something else
# In[157]:
# ### Because these are categorical variables with a (small) finite set of possible values, we can use them almost exactly the same way as we did with addon names. This we we can reuse a lot of the code.
# In[158]:
tokenized_catVars = p: tokenize_catVar(p))
print tokenized_catVars.take(1)
# ### then perform pairwise similarity comparisons and keep track of SS and SD compariosns in a list of id pairs
# In[159]:
crossSmall_cats = generate_nonCartesian_pairWiseComparison_RDD(tokenized_catVars)
sims_cats = p: computeOtherSim(p)).cache()
print sims_addon.take(1)
print sims_cats.take(1)
# In[160]:
c = p: p[0]).intersection( p: p[0])).count()
# In[161]:
print c
# In[162]:
vals_css = sims_cats.collect() # maybe we need to subsample!
temp3 = []
for i in vals_css:
 if i[1] > 0:
 temp3.append(i[1]) #store it on the client machine (locally) as a list
n, bins, patches = plt.hist(temp3, 150, normed=1, facecolor=lb, alpha=0.85)
plt.xlabel('Pairwise Cos
# In[163]:
p = pearsonr(temp2[1:100], temp3[1:100]) # must be the same shape!!
print p
# In[164]:
sims_combined_rdd = sims_addon.join(sims_cats).map(lambda p: ((p[0]), p[1][0], p[1][1]))
# In[187]:
ss_rdd = sims_combined_rdd.filter(lambda x: x[1] >= ADDON_SIM_THRESHOLD).map(lambda p: p[1])
ds_rdd = sims_combined_rdd.filter(lambda x: x[1] < ADDON_SIM_THRESHOLD).map(lambda p: p[1])
LR_vals_x = np.arange(0,1,0.0001)
# In[188]:
def exponential_func(x, a, b, c):
return a * np.exp(-b * x) + c
# later this may be handy
ss_mu = ss_rdd.mean()
ss_sig = ss_rdd.sampleStdev()
ds_mu = ds_rdd.mean()
ds_sig = ds_rdd.sampleStdev()
print ss_mu, ss_sig
print ds_mu, ds_sig
# Find density estimates for the given values
#LR_vals_y_norm = scipy.stats.norm(ss_mu, ss_sig).pdf(LR_vals_x)/scipy.stats.norm(ds_mu, ds_sig).pdf(LR_vals_x) #LR right here!
#poly_model_norm = np.polyfit(LR_vals_x, LR_vals_y_norm, 3)# fit the 3rd order polynomial, make sure this is ok by exmaining residuals later on!popt, pcov = curve_fit(func, x, y, p0=(1, 1e-6, 1))
#popt, pcov = scipy.optimize.curve_fit(exponential_func, LR_vals_x, LR_vals_y_norm, method='lm')
#exponential_model_norm = exponential_func(LR_vals_x, *popt)
#LR_vals_y_norm_prime = np.polyval(poly_model_norm, LR_vals_x)# sanity check!
# ### Compute model densities for the two groups using KDE!
# In[189]:
from pyspark.mllib.stat import KernelDensity
# Construct the density estimator with the sample data and a standard deviation for the Gaussian
# kernels
BandWidth = 0.35
kd_ss = KernelDensity()
kd_ss.setSample(ss_rdd) p: p[1]))
kd_ds = KernelDensity()
kd_ds.setSample(ds_rdd) p: p[1]))
num_kde_vals = kd_ss.estimate(LR_vals_x);
den_kde_vals = kd_ds.estimate(LR_vals_x);
LR_vals_y_kde = num_kde_vals/den_kde_vals # LR right here!
print len(LR_vals_y_kde)
print ss_rdd.count()
# In[190]:
plt.plot(LR_vals_x, num_kde_vals, color=do, linewidth=3.5)
plt.plot(LR_vals_x, den_kde_vals, color=db, linewidth=3.5)
plt.plot(LR_vals_x, LR_vals_y_kde, color=lb, linewidth=3.5)
plt.xlabel('Observed pariwise non-addon similarity')
plt.ylabel('Likelihood Ratio of observing high addon-similarity \n given observed non-addon similarity')
plt.title('Function relating two senses of similarity (common source score-based LR model)')
plt.legend(['simialrity densities good candidate donors', 'similarity densities poor candidate donors', 'likelihood ratio'], loc=2)
# ### define addon_donners subset
# ### define the mapping to get a ranked list of ranked paired users
# ### weight the addons by their donner's LR value
# ### aggregate (sum) and deduplicate to addon-LRagg RDD then sort
# ### recomend top n addons
# In[206]:
NEW_USER_ID = "068555da-f387-4549-ae09-968589b33865"
# p: (p['client_id'], p['active_addons'].keys())).map(lambda p: (p[0], filterSystemAddons(p[1], ADDON_EXCLUSION_LIST))).filter(lambda p: len(p[1])>1).take(100)
# find a candidate user that is interesting
# d508f84d-1abe-42af-b714-a92b1a0193f4
# ac04b744-0280-4368-a2a2-c4015badc3ac
# 068555da-f387-4549-ae09-968589b33865
# In[207]:
def calcKDE_LR(ss_model, ds_model, score):
return (ss_model.estimate(score)/ds_model.estimate(score))
# this can be faster if we pre-fir the KDE stuff with a polynomial or exponential model that is parametric and callable
def generate_one_versus_rest_comparison(rdd_donnors, new_user_id, new_user_cats):
#newUSRnew_user = new_user.collect()
comp_rdd = p: ((p[0],p[1]), (new_user_id, new_user_cats)) )
return comp_rdd
# In[208]:
# generate the donor pool of add-ons
new_subset_cat_tokenized_rdd = freshPings_cat_DF.rdd.sample(False, 0.0005).map(lambda p: tokenize_catVar(p)).filter(lambda p: exclude_empty_entities(p))
print new_subset_cat_tokenized_rdd.count()
# In[209]:
arbitrary_new_user_addons = subset_addon_rdd.filter(lambda p: p['client_id'] == NEW_USER_ID).map(lambda p: (p[0], filterSystemAddons(p[1], ADDON_EXCLUSION_LIST))).filter(lambda p: len(p[1])>1).cache()
arbitrary_new_user_cat = subset_cat_rdd.filter(lambda p: p['client_id'] == NEW_USER_ID).map(lambda p: tokenize_catVar(p)).collect()
print arbitrary_new_user_cat
print arbitrary_new_user_addons.collect()
# In[210]:
compRDD_newUser = generate_one_versus_rest_comparison(new_subset_cat_tokenized_rdd, arbitrary_new_user_cat[0][0], arbitrary_new_user_cat[0][1])
# In[211]:
print compRDD_newUser.take(1)
# In[212]:
#newSIMS =
new_sims_to_donors = p: computeOtherSim(p)).cache()
# In[213]:
#numerator_vals = p: kd_ss.estimate(p[1]))
sim_dict_new_user = new_sims_to_donors.filter(lambda p: p[0][0] != NEW_USER_ID).map(lambda p: (p[0][0], p[1])).collectAsMap()
#print lrs_new_user.take(10)
LR_vals_new_user = kd_ss.estimate(sim_dict_new_user.values())/kd_ds.estimate(sim_dict_new_user.values()) # LR right here!
plt.xlabel('Arbitrary index into a candidate addon donor')
plt.ylabel('Likelihood Ratio of observing high addon-similarity \n between new user and candidate donor')
plt.title('Examining similar users to find good addon donors')
# In[214]:
LR_DICT = dict(zip(sim_dict_new_user.keys(),LR_vals_new_user))
# In[215]:
def assignLRval(p, Dictonary_of_lrs):
outlist = []
for i in p[1]:
return outlist
def assignOnes(p):
outlist = []
for i in p[1]:
return outlist
def getAddonNames(p, n):
st_a = "";
st_c= "/"
#print p[0][0]
recomendations = []
for i in p:
st_b = str(i[0])
url = st_a + st_b + st_c
# print url
r = urllib2.urlopen(url)
data = json.load(r)
print url
# recomendations.append(data)
#data = json.loads('charset') or 'utf-8'))
except IOError:
#print "addon information unreachable"
# print recomendations
return recomendations[1:n]
# In[216]:
donnor_ids = [x[0] for x in LR_DICT.items()]
donnor_lrs = [log(x[1]) for x in LR_DICT.items()]
### two strategies for agregating the results here:
# 1. using summation (will include more bias for more frequently installed addons in the donor list independent of LR score)
# 2. using average (will emphgasize similarity very agressively, which may remove top n from recomendations)
### uncomment desired strategy:
###### Strategy 1: Summation
final_ranking_of_addons = subset_addon_tok_filtered_rdd.filter(lambda p: p[0] in donnor_ids).flatMap(lambda p: assignLRval(p, LR_DICT)) .foldByKey(0,add)
###### Strategy 2: Averaging
#final_ranking_of_addons = subset_addon_tok_filtered_rdd.filter(lambda p: p[0] in donnor_ids).flatMap(lambda p: assignLRval(p, LR_DICT))\
# .aggregateByKey((0,0), lambda a,b: (a[0] + b, a[1] + 1),lambda a,b: (a[0] + b[0], a[1] + b[1])).mapValues(lambda v: v[0]/v[1]).cache()
# In[217]:
print arbitrary_new_user_cat[0][0]
print arbitrary_new_user_cat[0][1]
# In[218]:
addon_recommendations = final_ranking_of_addons.takeOrdered(40, key=lambda x: -x[1])
getAddonNames(addon_recommendations, 10)
# In[219]:
# display true addons installed
ground_truth = p: assignOnes(p)).collect()
getAddonNames(ground_truth[0], 10)
# In[220]:
print arbitrary_new_user_addons.collect()
# ## Next steps?
# ### exploration of variable relationships?
# ### refine addon recomendations for users with existing addons
