Skip to content

Instantly share code, notes, and snippets.

@jtg567
Last active April 18, 2018 19:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jtg567/6e029abc2f4e61b7e5a6b319e8d34d2f to your computer and use it in GitHub Desktop.
Save jtg567/6e029abc2f4e61b7e5a6b319e8d34d2f to your computer and use it in GitHub Desktop.
Track down Tab Center pings
Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
# coding: utf-8
# In[1]:
import ujson as json
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import plotly.plotly as py
import datetime as DT
import itertools
from pprint import pprint as pp
from scipy import stats
from pyspark.sql import Row, SQLContext
from pyspark.sql.types import LongType, BooleanType
from pyspark.sql.functions import explode, when, udf, col, first, expr, broadcast
from pyspark.sql.functions import sum as sqlSum
from moztelemetry.standards import sampler
get_ipython().magic(u'matplotlib inline')
pd.set_option('display.width', 160) # default is 80
# how many cores active on this cluster?
print "\n"+str(sc.defaultParallelism)+"\n"
def sumArray(x):
# it will be none instead of zero if there's one ping, leading to a TypeError with the math
if x is not None:
if None in x:
# if x is a list containing some None
x = [0 if v is None else v for v in x]
return sum(x)
else:
# if x is a list without any None
return sum(x)
else:
return 0
sumArray_udf = udf(sumArray, LongType())
def inStudy(activeAddons):
if activeAddons:
for row in activeAddons:
if row.addon_id == "tabcentertest1@mozilla.com":
return True
else:
return False
inStudy_udf = udf(inStudy, BooleanType())
def eachTest(i):
tControl = dfControl.select(i).toPandas()
tVariant = dfVariant.select(i).toPandas()
print i+": "+str(stats.ttest_ind(tControl, tVariant, equal_var= False))+" Means: Control= "+str(np.mean(tControl)[0])+" Variant= "+str(np.mean(tVariant)[0])
tVariant['branch'] = 'Variant'
tControl['branch'] = 'Control'
tControl.append(tVariant).hist(column= i, by= "branch", bins= 50, sharey= True, sharex= True)
def diffTest(i):
tControlPre = preControl.select('pre_'+i).toPandas()
tVariantPre = preVariant.select('pre_'+i).toPandas()
tControlPost = preControl.select(i).toPandas()
tVariantPost = preVariant.select(i).toPandas()
print i+": "+str(stats.ttest_ind(tControl, tVariant, equal_var= False))+" Means: Control= "+str(np.mean(tControl)[0])+" Variant= "+str(np.mean(tVariant)[0])
tVariant['branch'] = 'Variant'
tControl['branch'] = 'Control'
tControl.append(tVariant).hist(column= i, by= "branch", bins= 50, sharey= True, sharex= True)
# ---
# ### only run these to build list of distinct clients that ever have a tabcenter addonId in a main ping -- once it's stored via parquet load it at the next step
# In[ ]:
# read addons table from parquet
addons = spark.read.option("mergeSchema", "true").parquet('s3://telemetry-parquet/addons/v2/')
# In[ ]:
# get distinct clients that ever have a tabcenter addonId in their main ping during the study and write to parquet (takes hours)
addons.select('client_id').distinct().where('submission_date_s3 between "20171012" and "20171112"').where('addon_id="tabcentertest1@mozilla.com"').write.parquet("s3://telemetry-private-analysis-2/jgaunt/tab-center-1", mode="overwrite")
# ---
# ### only run these if main summary df not stored via parquet; otherwise skip ahead to analysis
# In[2]:
# read main summary from parquet
MS = spark.read.option("mergeSchema", "true").parquet('s3://telemetry-parquet/main_summary/v4/')
# this should be all of the fields you need (during dev limit by adding to the select: # sample_id = 1 AND)
MSdf = MS.where('submission_date_s3 between "20171012" and "20171112"').select( 'client_id', 'profile_creation_date', 'submission_date_s3', 'document_id', 'places_pages_count', 'active_ticks', col('scalar_parent_browser_engagement_max_concurrent_tab_count').alias("max_tabs"), col('scalar_parent_browser_engagement_unique_domains_count').alias('unique_domains_count'), col('scalar_parent_browser_engagement_total_uri_count').alias('total_uri_count'), when(inStudy_udf("active_addons") == True, 1).otherwise(0).alias("inStudy"), sumArray_udf(col("search_counts.count")).alias("search_count") ).fillna(0) #{'max_tabs': 0, 'unique_domains_count': 0, 'total_uri_count': 0, 'places_pages_count': 0} .where(col('client_id').isNotNull())
# next aggregate by clientId, date to get final rows per day with metrics for each
MSdf = MSdf.groupBy('client_id','submission_date_s3').agg( first('profile_creation_date').alias('profile_creation_date'), sqlSum('places_pages_count').alias('places_pages_count'), sqlSum(expr('active_ticks/(3600.0/5)')).alias('active_hours'), sqlSum('max_tabs').alias('max_tabs'), sqlSum('unique_domains_count').alias('unique_domains_count'), sqlSum('total_uri_count').alias('total_uri_count'), sqlSum('search_count').alias('search_count'), sqlSum('inStudy').alias('inStudy'))
# In[ ]:
# load tabcenter distinct client df from parquet
clients = sqlContext.read.parquet("s3://telemetry-private-analysis-2/jgaunt/tab-center-1")#.where(col('client_id').isNotNull())
# sunah warned that I should do a broadcast join here to avoid unnecessary data shuffling https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.functions.broadcast
clients = broadcast(clients)
# anthony m suggested modifying this parameter for "big joins"
spark.conf.set("spark.sql.shuffle.partitions", spark.sparkContext.defaultParallelism * 4)
# main pings limited by client_id known to have ever used tab center during the study
MSdfJ = MSdf.join(clients, clients.client_id == MSdf.client_id, "right").drop(clients.client_id)
# write to s3 (takes hours)
MSdfJ.write.parquet("s3://telemetry-private-analysis-2/jgaunt/tab-center-2", mode="overwrite")
# In[ ]:
# also use MSdf to generate a random/balanced sample of non-tabcenter clients to compare to and write to s3 (takes hours)
# (I verified same number of distinct tabcenter clients pre/post join) # print dfVariant.select('client_id').distinct().where('inStudy = 1').count()
# ended up not going this way...
MSdf.groupBy("client_id").agg( first('profile_creation_date').alias('profile_creation_date'), sqlSum('places_pages_count').alias('places_pages_count'), sqlSum('active_hours').alias('active_hours'), sqlSum('max_tabs').alias('max_tabs'), sqlSum('unique_domains_count').alias('unique_domains_count'), sqlSum('total_uri_count').alias('total_uri_count'), sqlSum('search_count').alias('search_count') ).filter(col('client_id').isin([str(c.client_id) for c in clients.collect()]) == False).limit(38059).write.parquet("s3://telemetry-private-analysis-2/jgaunt/tab-center-3", mode="overwrite")
# ---
# ### actual analysis
# In[18]:
# load munged df from parquet (START HERE IF ALREADY ON S3)
#dfControl = sqlContext.read.parquet("s3://telemetry-private-analysis-2/jgaunt/tab-center-3")
# aggregate over days so rows are same granuarity as Control
dfVariant = sqlContext.read.parquet("s3://telemetry-private-analysis-2/jgaunt/tab-center-2") .where('inStudy > 0').groupBy("client_id").agg( first('profile_creation_date').alias('profile_creation_date'), sqlSum('places_pages_count').alias('places_pages_count'), sqlSum('active_hours').alias('active_hours'), sqlSum('max_tabs').alias('max_tabs'), sqlSum('unique_domains_count').alias('unique_domains_count'), sqlSum('total_uri_count').alias('total_uri_count'), sqlSum('search_count').alias('search_count'))
# comparing to randomly drawn control is problematic without controlling in some way for the quantity of pings being aggregated within the date window - right now control metrics tower over variant
dfVariant.count()
# In[19]:
clients = sqlContext.read.parquet("s3://telemetry-private-analysis-2/jgaunt/tab-center-1")#.where(col('client_id').isNotNull())
# also use MSdf to generate a random/balanced sample of non-tabcenter clients to compare to and write to s3 (takes hours)
# (I verified same number of distinct tabcenter clients pre/post join) # print dfVariant.select('client_id').distinct().where('inStudy = 1').count()
MSdf2 = MSdf.groupBy("client_id").agg( first('profile_creation_date').alias('profile_creation_date'), sqlSum('places_pages_count').alias('places_pages_count'), sqlSum('active_hours').alias('active_hours'), sqlSum('max_tabs').alias('max_tabs'), sqlSum('unique_domains_count').alias('unique_domains_count'), sqlSum('total_uri_count').alias('total_uri_count'), sqlSum('search_count').alias('search_count') ).filter(col('client_id').isin([str(c.client_id) for c in clients.collect()]) == False)
## build the control sample https://github.com/mozilla/python_moztelemetry/blob/master/moztelemetry/standards.py#L205
# I literally got the modulo argument by varying it and checking the count until it matched dfVariant.count() as closely as possible
dfControl = sampler(MSdf2, 6400) # 7000 gave 44799, 6500 gave 48359, 6450 gave 48613, 6420 gave 48682, 6410 gave 48409, 6400 gave 49209, 6300 gave 49862
dfControl.count()
# In[20]:
# read main summary from parquet
MS = spark.read.option("mergeSchema", "true").parquet('s3://telemetry-parquet/main_summary/v4/')
# this should be all of the fields you need (during dev limit by adding to the select: # sample_id = 1 AND)
preMSdf = MS.where('submission_date_s3 between "20170911" and "20171011"').select( 'client_id', 'profile_creation_date', 'submission_date_s3', 'document_id', 'places_pages_count', 'active_ticks', col('scalar_parent_browser_engagement_max_concurrent_tab_count').alias("max_tabs"), col('scalar_parent_browser_engagement_unique_domains_count').alias('unique_domains_count'), col('scalar_parent_browser_engagement_total_uri_count').alias('total_uri_count'), sumArray_udf(col("search_counts.count")).alias("search_count") ).fillna(0) #{'max_tabs': 0, 'unique_domains_count': 0, 'total_uri_count': 0, 'places_pages_count': 0} .where(col('client_id').isNotNull())
# next aggregate by clientId, date to get final rows per day with metrics for each
preMSdf = preMSdf.groupBy('client_id').agg( first('profile_creation_date').alias('pre_profile_creation_date'), sqlSum('places_pages_count').alias('pre_places_pages_count'), sqlSum(expr('active_ticks/(3600.0/5)')).alias('pre_active_hours'), sqlSum('max_tabs').alias('pre_max_tabs'), sqlSum('unique_domains_count').alias('pre_unique_domains_count'), sqlSum('total_uri_count').alias('pre_total_uri_count'), sqlSum('search_count').alias('pre_search_count'))
# In[21]:
preVariant = dfVariant.join(preMSdf, dfVariant.client_id == preMSdf.client_id, "inner")
preControl = dfControl.join(preMSdf, dfControl.client_id == preMSdf.client_id, "inner")
# In[15]:
preVariant.count()
# In[16]:
preControl.count()
# In[22]:
def diffTest(i):
ControlPre = preControl.select('pre_'+i).toPandas()
VariantPre = preVariant.select('pre_'+i).toPandas()
ControlPost = preControl.select(i).toPandas()
VariantPost = preVariant.select(i).toPandas()
tControl = ControlPost[i].subtract(ControlPre['pre_'+i])
tVariant = VariantPost[i].subtract(VariantPre['pre_'+i])
print i+": "+str(stats.ttest_ind(tControl, tVariant, equal_var= False))+" Means: Control= "+str(np.mean(tControl))+" Variant= "+str(np.mean(tVariant))
Variant = pd.DataFrame({'diff': tVariant, 'branch': 'Variant'})
Control = pd.DataFrame({'diff': tControl, 'branch': 'Control'})
Control.append(Variant).hist(column= 'diff', by= "branch", bins= 50, sharey= True, sharex= True)
# In[56]:
diffTest("max_tabs")
# In[57]:
diffTest('places_pages_count')
# In[99]:
diffTest('unique_domains_count')
# In[100]:
diffTest('total_uri_count')
# In[101]:
diffTest('search_count')
# In[39]:
# all the metrics above are int64 type coming from spark - active_hours is float64 and required manual treatment to do some type conversions
i = 'active_hours'
ControlPre = preControl.select('pre_'+i).toPandas()
VariantPre = preVariant.select('pre_'+i).toPandas()
ControlPost = preControl.select(i).toPandas()
VariantPost = preVariant.select(i).toPandas()
VariantPre['pre_active_hours'] = VariantPre['pre_active_hours'].astype('float')
tControl = ControlPost[i].subtract(ControlPre['pre_'+i])
tVariant = VariantPost[i].subtract(VariantPre['pre_'+i])
tControl = tControl.astype("float")
print i+": "+str(stats.ttest_ind(tControl, tVariant, equal_var= False))+" Means: Control= "+str(np.mean(tControl))+" Variant= "+str(np.mean(tVariant))
Variant = pd.DataFrame({'diff': tVariant, 'branch': 'Variant'})
Control = pd.DataFrame({'diff': tControl, 'branch': 'Control'})
Control.append(Variant).hist(column= 'diff', by= "branch", bins= 50, sharey= True, sharex= True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment