Last active
April 18, 2018 19:41
-
-
Save jtg567/6e029abc2f4e61b7e5a6b319e8d34d2f to your computer and use it in GitHub Desktop.
Track down Tab Center pings
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
# In[1]: | |
import ujson as json | |
import matplotlib.pyplot as plt | |
import pandas as pd | |
import numpy as np | |
import plotly.plotly as py | |
import datetime as DT | |
import itertools | |
from pprint import pprint as pp | |
from scipy import stats | |
from pyspark.sql import Row, SQLContext | |
from pyspark.sql.types import LongType, BooleanType | |
from pyspark.sql.functions import explode, when, udf, col, first, expr, broadcast | |
from pyspark.sql.functions import sum as sqlSum | |
from moztelemetry.standards import sampler | |
get_ipython().magic(u'matplotlib inline') | |
pd.set_option('display.width', 160) # default is 80 | |
# how many cores active on this cluster? | |
print "\n"+str(sc.defaultParallelism)+"\n" | |
def sumArray(x): | |
# it will be none instead of zero if there's one ping, leading to a TypeError with the math | |
if x is not None: | |
if None in x: | |
# if x is a list containing some None | |
x = [0 if v is None else v for v in x] | |
return sum(x) | |
else: | |
# if x is a list without any None | |
return sum(x) | |
else: | |
return 0 | |
sumArray_udf = udf(sumArray, LongType()) | |
def inStudy(activeAddons): | |
if activeAddons: | |
for row in activeAddons: | |
if row.addon_id == "tabcentertest1@mozilla.com": | |
return True | |
else: | |
return False | |
inStudy_udf = udf(inStudy, BooleanType()) | |
def eachTest(i): | |
tControl = dfControl.select(i).toPandas() | |
tVariant = dfVariant.select(i).toPandas() | |
print i+": "+str(stats.ttest_ind(tControl, tVariant, equal_var= False))+" Means: Control= "+str(np.mean(tControl)[0])+" Variant= "+str(np.mean(tVariant)[0]) | |
tVariant['branch'] = 'Variant' | |
tControl['branch'] = 'Control' | |
tControl.append(tVariant).hist(column= i, by= "branch", bins= 50, sharey= True, sharex= True) | |
def diffTest(i): | |
tControlPre = preControl.select('pre_'+i).toPandas() | |
tVariantPre = preVariant.select('pre_'+i).toPandas() | |
tControlPost = preControl.select(i).toPandas() | |
tVariantPost = preVariant.select(i).toPandas() | |
print i+": "+str(stats.ttest_ind(tControl, tVariant, equal_var= False))+" Means: Control= "+str(np.mean(tControl)[0])+" Variant= "+str(np.mean(tVariant)[0]) | |
tVariant['branch'] = 'Variant' | |
tControl['branch'] = 'Control' | |
tControl.append(tVariant).hist(column= i, by= "branch", bins= 50, sharey= True, sharex= True) | |
# --- | |
# ### only run these to build list of distinct clients that ever have a tabcenter addonId in a main ping -- once it's stored via parquet load it at the next step | |
# In[ ]: | |
# read addons table from parquet | |
addons = spark.read.option("mergeSchema", "true").parquet('s3://telemetry-parquet/addons/v2/') | |
# In[ ]: | |
# get distinct clients that ever have a tabcenter addonId in their main ping during the study and write to parquet (takes hours) | |
addons.select('client_id').distinct().where('submission_date_s3 between "20171012" and "20171112"').where('addon_id="tabcentertest1@mozilla.com"').write.parquet("s3://telemetry-private-analysis-2/jgaunt/tab-center-1", mode="overwrite") | |
# --- | |
# ### only run these if main summary df not stored via parquet; otherwise skip ahead to analysis | |
# In[2]: | |
# read main summary from parquet | |
MS = spark.read.option("mergeSchema", "true").parquet('s3://telemetry-parquet/main_summary/v4/') | |
# this should be all of the fields you need (during dev limit by adding to the select: # sample_id = 1 AND) | |
MSdf = MS.where('submission_date_s3 between "20171012" and "20171112"').select( 'client_id', 'profile_creation_date', 'submission_date_s3', 'document_id', 'places_pages_count', 'active_ticks', col('scalar_parent_browser_engagement_max_concurrent_tab_count').alias("max_tabs"), col('scalar_parent_browser_engagement_unique_domains_count').alias('unique_domains_count'), col('scalar_parent_browser_engagement_total_uri_count').alias('total_uri_count'), when(inStudy_udf("active_addons") == True, 1).otherwise(0).alias("inStudy"), sumArray_udf(col("search_counts.count")).alias("search_count") ).fillna(0) #{'max_tabs': 0, 'unique_domains_count': 0, 'total_uri_count': 0, 'places_pages_count': 0} .where(col('client_id').isNotNull()) | |
# next aggregate by clientId, date to get final rows per day with metrics for each | |
MSdf = MSdf.groupBy('client_id','submission_date_s3').agg( first('profile_creation_date').alias('profile_creation_date'), sqlSum('places_pages_count').alias('places_pages_count'), sqlSum(expr('active_ticks/(3600.0/5)')).alias('active_hours'), sqlSum('max_tabs').alias('max_tabs'), sqlSum('unique_domains_count').alias('unique_domains_count'), sqlSum('total_uri_count').alias('total_uri_count'), sqlSum('search_count').alias('search_count'), sqlSum('inStudy').alias('inStudy')) | |
# In[ ]: | |
# load tabcenter distinct client df from parquet | |
clients = sqlContext.read.parquet("s3://telemetry-private-analysis-2/jgaunt/tab-center-1")#.where(col('client_id').isNotNull()) | |
# sunah warned that I should do a broadcast join here to avoid unnecessary data shuffling https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.functions.broadcast | |
clients = broadcast(clients) | |
# anthony m suggested modifying this parameter for "big joins" | |
spark.conf.set("spark.sql.shuffle.partitions", spark.sparkContext.defaultParallelism * 4) | |
# main pings limited by client_id known to have ever used tab center during the study | |
MSdfJ = MSdf.join(clients, clients.client_id == MSdf.client_id, "right").drop(clients.client_id) | |
# write to s3 (takes hours) | |
MSdfJ.write.parquet("s3://telemetry-private-analysis-2/jgaunt/tab-center-2", mode="overwrite") | |
# In[ ]: | |
# also use MSdf to generate a random/balanced sample of non-tabcenter clients to compare to and write to s3 (takes hours) | |
# (I verified same number of distinct tabcenter clients pre/post join) # print dfVariant.select('client_id').distinct().where('inStudy = 1').count() | |
# ended up not going this way... | |
MSdf.groupBy("client_id").agg( first('profile_creation_date').alias('profile_creation_date'), sqlSum('places_pages_count').alias('places_pages_count'), sqlSum('active_hours').alias('active_hours'), sqlSum('max_tabs').alias('max_tabs'), sqlSum('unique_domains_count').alias('unique_domains_count'), sqlSum('total_uri_count').alias('total_uri_count'), sqlSum('search_count').alias('search_count') ).filter(col('client_id').isin([str(c.client_id) for c in clients.collect()]) == False).limit(38059).write.parquet("s3://telemetry-private-analysis-2/jgaunt/tab-center-3", mode="overwrite") | |
# --- | |
# ### actual analysis | |
# In[18]: | |
# load munged df from parquet (START HERE IF ALREADY ON S3) | |
#dfControl = sqlContext.read.parquet("s3://telemetry-private-analysis-2/jgaunt/tab-center-3") | |
# aggregate over days so rows are same granuarity as Control | |
dfVariant = sqlContext.read.parquet("s3://telemetry-private-analysis-2/jgaunt/tab-center-2") .where('inStudy > 0').groupBy("client_id").agg( first('profile_creation_date').alias('profile_creation_date'), sqlSum('places_pages_count').alias('places_pages_count'), sqlSum('active_hours').alias('active_hours'), sqlSum('max_tabs').alias('max_tabs'), sqlSum('unique_domains_count').alias('unique_domains_count'), sqlSum('total_uri_count').alias('total_uri_count'), sqlSum('search_count').alias('search_count')) | |
# comparing to randomly drawn control is problematic without controlling in some way for the quantity of pings being aggregated within the date window - right now control metrics tower over variant | |
dfVariant.count() | |
# In[19]: | |
clients = sqlContext.read.parquet("s3://telemetry-private-analysis-2/jgaunt/tab-center-1")#.where(col('client_id').isNotNull()) | |
# also use MSdf to generate a random/balanced sample of non-tabcenter clients to compare to and write to s3 (takes hours) | |
# (I verified same number of distinct tabcenter clients pre/post join) # print dfVariant.select('client_id').distinct().where('inStudy = 1').count() | |
MSdf2 = MSdf.groupBy("client_id").agg( first('profile_creation_date').alias('profile_creation_date'), sqlSum('places_pages_count').alias('places_pages_count'), sqlSum('active_hours').alias('active_hours'), sqlSum('max_tabs').alias('max_tabs'), sqlSum('unique_domains_count').alias('unique_domains_count'), sqlSum('total_uri_count').alias('total_uri_count'), sqlSum('search_count').alias('search_count') ).filter(col('client_id').isin([str(c.client_id) for c in clients.collect()]) == False) | |
## build the control sample https://github.com/mozilla/python_moztelemetry/blob/master/moztelemetry/standards.py#L205 | |
# I literally got the modulo argument by varying it and checking the count until it matched dfVariant.count() as closely as possible | |
dfControl = sampler(MSdf2, 6400) # 7000 gave 44799, 6500 gave 48359, 6450 gave 48613, 6420 gave 48682, 6410 gave 48409, 6400 gave 49209, 6300 gave 49862 | |
dfControl.count() | |
# In[20]: | |
# read main summary from parquet | |
MS = spark.read.option("mergeSchema", "true").parquet('s3://telemetry-parquet/main_summary/v4/') | |
# this should be all of the fields you need (during dev limit by adding to the select: # sample_id = 1 AND) | |
preMSdf = MS.where('submission_date_s3 between "20170911" and "20171011"').select( 'client_id', 'profile_creation_date', 'submission_date_s3', 'document_id', 'places_pages_count', 'active_ticks', col('scalar_parent_browser_engagement_max_concurrent_tab_count').alias("max_tabs"), col('scalar_parent_browser_engagement_unique_domains_count').alias('unique_domains_count'), col('scalar_parent_browser_engagement_total_uri_count').alias('total_uri_count'), sumArray_udf(col("search_counts.count")).alias("search_count") ).fillna(0) #{'max_tabs': 0, 'unique_domains_count': 0, 'total_uri_count': 0, 'places_pages_count': 0} .where(col('client_id').isNotNull()) | |
# next aggregate by clientId, date to get final rows per day with metrics for each | |
preMSdf = preMSdf.groupBy('client_id').agg( first('profile_creation_date').alias('pre_profile_creation_date'), sqlSum('places_pages_count').alias('pre_places_pages_count'), sqlSum(expr('active_ticks/(3600.0/5)')).alias('pre_active_hours'), sqlSum('max_tabs').alias('pre_max_tabs'), sqlSum('unique_domains_count').alias('pre_unique_domains_count'), sqlSum('total_uri_count').alias('pre_total_uri_count'), sqlSum('search_count').alias('pre_search_count')) | |
# In[21]: | |
preVariant = dfVariant.join(preMSdf, dfVariant.client_id == preMSdf.client_id, "inner") | |
preControl = dfControl.join(preMSdf, dfControl.client_id == preMSdf.client_id, "inner") | |
# In[15]: | |
preVariant.count() | |
# In[16]: | |
preControl.count() | |
# In[22]: | |
def diffTest(i): | |
ControlPre = preControl.select('pre_'+i).toPandas() | |
VariantPre = preVariant.select('pre_'+i).toPandas() | |
ControlPost = preControl.select(i).toPandas() | |
VariantPost = preVariant.select(i).toPandas() | |
tControl = ControlPost[i].subtract(ControlPre['pre_'+i]) | |
tVariant = VariantPost[i].subtract(VariantPre['pre_'+i]) | |
print i+": "+str(stats.ttest_ind(tControl, tVariant, equal_var= False))+" Means: Control= "+str(np.mean(tControl))+" Variant= "+str(np.mean(tVariant)) | |
Variant = pd.DataFrame({'diff': tVariant, 'branch': 'Variant'}) | |
Control = pd.DataFrame({'diff': tControl, 'branch': 'Control'}) | |
Control.append(Variant).hist(column= 'diff', by= "branch", bins= 50, sharey= True, sharex= True) | |
# In[56]: | |
diffTest("max_tabs") | |
# In[57]: | |
diffTest('places_pages_count') | |
# In[99]: | |
diffTest('unique_domains_count') | |
# In[100]: | |
diffTest('total_uri_count') | |
# In[101]: | |
diffTest('search_count') | |
# In[39]: | |
# all the metrics above are int64 type coming from spark - active_hours is float64 and required manual treatment to do some type conversions | |
i = 'active_hours' | |
ControlPre = preControl.select('pre_'+i).toPandas() | |
VariantPre = preVariant.select('pre_'+i).toPandas() | |
ControlPost = preControl.select(i).toPandas() | |
VariantPost = preVariant.select(i).toPandas() | |
VariantPre['pre_active_hours'] = VariantPre['pre_active_hours'].astype('float') | |
tControl = ControlPost[i].subtract(ControlPre['pre_'+i]) | |
tVariant = VariantPost[i].subtract(VariantPre['pre_'+i]) | |
tControl = tControl.astype("float") | |
print i+": "+str(stats.ttest_ind(tControl, tVariant, equal_var= False))+" Means: Control= "+str(np.mean(tControl))+" Variant= "+str(np.mean(tVariant)) | |
Variant = pd.DataFrame({'diff': tVariant, 'branch': 'Variant'}) | |
Control = pd.DataFrame({'diff': tControl, 'branch': 'Control'}) | |
Control.append(Variant).hist(column= 'diff', by= "branch", bins= 50, sharey= True, sharex= True) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment