Skip to content

Instantly share code, notes, and snippets.

@mreid-moz
Last active June 24, 2016 20:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mreid-moz/e007487a0b03f2ee40ad3ccd6b21f44a to your computer and use it in GitHub Desktop.
Save mreid-moz/e007487a0b03f2ee40ad3ccd6b21f44a to your computer and use it in GitHub Desktop.
TestPilot Latency 10pct
Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
# coding: utf-8
# # TestPilot Latency Investigation
#
# We observe a relatively high latency of data reporting via `testpilot` pings as compared to both `testpilottest` pings as well as [Google Analytics](https://datastudio.google.com/u/0/#/reporting/0B6voOaUZL-jwZHNkLXlFdEFndWc) data (If this link doesn't work, and you are logged into multiple google accounts, you may need to change `/u/0` to `u/1` or 2 or whatever your moz account is).
#
# This notebook intends to find out why. See [Bug 1272395](https://bugzilla.mozilla.org/show_bug.cgi?id=1272395).
#
# ### Method
#
# Compare the latency for `testpilot` data reporting to the background latency of `main` pings, and find out if the TestPilot population appears to have significantly higher `main` ping latency and lossiness.
#
# Specifically,
#
# 1. For each `testpilot` ping, grab testpilot install date, ping creation date, submission date, and clientid
# 2. Find the earliest install date (or creation date) per clientid
# 3. Compute the delta between the install/creation date and the submission date
# 4. Look at the distribution in submission latency for clientids we *did* see.
#
# Do the same for `testpilottest` pings to see how the latency distribution compares.
#
# We can't efficiently filter the entire Telemetry corpus for "has testpilot enabled", but we can efficiently use the set of clientids in the union of both the sets above, and see what the latency looks like for `main` pings from the same clientids (and compare it to the background latency for all main pings) using the `main_summary` dataset.
#
# Further, we should check how many `testpilottest` clientids were not found in the main pings during the same interval.
#
# Some predictions:
#
# If we find that `testpilottest` contains many clientids that did not report `main` pings or that the latency for `testpilottest` clientids in the `main` dataset is significantly higher than the background latency, we are probably running up against the throttling behaviour on the client. Follow-up: How many `testpilottest` pings are reported per clientid per day? Actions here would be to decrease the number of `testpilottest` pings or ease up on client throttling for `testpilottest` pings.
#
# If we find that significantly more `testpilottest` clientids are present in the `main` pings than the `testpilot` pings (and that the latency is not significantly worse than the background rate), it should be safe to increase the frequency of `testpilot` submission, and that should improve latency.
# In[1]:
# How many cores are we running on?
sc.defaultParallelism
# ### Read source data
#
# Read the `testpilot` and `testpilottest` data from the Telemetry datastore on S3, and read the `main` ping data from the `main_summary` parquet dataset.
# In[2]:
from moztelemetry import get_pings, get_pings_properties
testpilot_pings = get_pings(sc, doc_type="testpilot", app="Firefox")
testpilottest_pings = get_pings(sc, doc_type="testpilottest", app="Firefox")
# In[3]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
bucket = "telemetry-parquet"
prefix = "main_summary/v2"
start_date = '20160511'
sample_rate = 10 # percent
main_summaries = sqlContext.read.load("s3://{}/{}".format(bucket, prefix), "parquet")
get_ipython().magic(u"time main_summaries = main_summaries.filter(main_summaries.submission_date_s3 >= start_date).select('document_id', 'sample_id', 'client_id', 'subsession_start_date', 'submission_date')")
# ### Extract a subset of fields from Telemetry data
#
# We don't want all the fields from the `testpilot(test?)` pings. Fetch the fields of interest only.
# In[4]:
# We can get the TestPilot install date from addon information.
testpilot_fields = ["id", "clientId", "creationDate", "meta/submissionDate", "environment/addons"]
testpilot_subset = get_pings_properties(testpilot_pings, testpilot_fields)
testpilottest_subset = get_pings_properties(testpilottest_pings, testpilot_fields)
# ### What date range are we looking at?
#
# Find the date range of TestPilot submissions so we know where to look for `main` pings. This is used above to limit the amount of data we need to look at.
#
# Let's look at how many unique clientIds we observe each day.
# In[5]:
tp_days = testpilot_subset.map(lambda t: (t["meta/submissionDate"], t["clientId"])).groupByKey().mapValues(lambda s: len(set(s))).collect()
sorted(tp_days)
# In[6]:
tpt_days = testpilottest_subset.map(lambda t: (t["meta/submissionDate"], t["clientId"])).groupByKey().mapValues(lambda s: len(set(s))).collect()
sorted(tpt_days)
# ### Use a sample of the `main` data for this period
#
# The `main_summary` data has been filtered for a similar range. From the above, it appears that going back to `20160511` should suffice.
#
# We use a sample of the clientId space for checking the "background" latency.
# In[7]:
main_sample = main_summaries.filter(main_summaries.sample_id < sample_rate)
# In[8]:
# Uncomment this if you want to look at the "main_sample" counts per day.
#main_sample.registerAsTable("main_sample")
#main_days = sqlContext.sql("SELECT submission_date, count(distinct client_id) FROM main_sample GROUP BY submission_date ORDER BY submission_date")
#main_days.collect()
# ### How does the latency look?
#
# First, discard any submissions that have a duplicate document id
# In[9]:
def dedupe_rdd(rdd, dupe_field="id"):
return rdd.map(lambda p: (p[dupe_field], p)) .reduceByKey(lambda a, b: a) .map(lambda t:t[1])
def dedupe_df(df, dupe_field="id"):
return df.dropDuplicates([dupe_field])
count_before = testpilot_subset.count()
testpilot_dedup = dedupe_rdd(testpilot_subset)
count_after = testpilot_dedup.count()
print "Before deduping testpilot: {}, after: {}".format(count_before, count_after)
count_before = testpilottest_subset.count()
testpilottest_dedup = dedupe_rdd(testpilottest_subset)
count_after = testpilottest_dedup.count()
print "Before deduping testpilottest: {}, after: {}".format(count_before, count_after)
count_before = main_sample.count()
main_sample_dedup = dedupe_df(main_sample, dupe_field="document_id")
count_after = main_sample_dedup.count()
print "Before deduping main ({}% sample): {}, after: {}".format(sample_rate, count_before, count_after)
# ### Convert dates to integers
#
# Define some helper functions for converting all the date fields to integers (number of days since Jan 1, 1970). We pre-compute the numbers for a 60-day window around "today" to improve performance. It also allows us to discard outliers which we see due to misconfigured client clocks.
# In[10]:
from datetime import datetime as dt, timedelta, date
date_map = {}
now = dt.utcnow()
then = dt(1970, 1, 1)
# Beginning of TestPilot submissions
cutoff = dt(2016, 4, 8)
i = 0
while True:
# Future
d = now + timedelta(i)
n = (d - then).days
date_map[dt.strftime(d, "%Y%m%d")] = n
date_map[dt.strftime(d, "%Y-%m-%d")] = n
# Past
d = now - timedelta(i)
n = (d - then).days
date_map[dt.strftime(d, "%Y%m%d")] = n
date_map[dt.strftime(d, "%Y-%m-%d")] = n
i += 1
if d < cutoff:
break
# Set up a broadcast variable we can use later.
broadcast_dm = sc.broadcast(date_map)
# Convert testpilot/testpilottest pings.
def convert(p):
dm = broadcast_dm.value
creationDayNum = None
cd = p.get("creationDate", "")[0:10]
if cd in dm:
creationDayNum = dm[cd]
installDayNum = None
if p['environment/addons'] is not None and u'activeAddons' in p['environment/addons']:
actives = p['environment/addons'][u'activeAddons']
if u'@testpilot-addon' in actives:
installDayNum = actives[u'@testpilot-addon'].get('installDay')
submissionDayNum = dm[p['meta/submissionDate']]
return (p['clientId'], (p['clientId'], submissionDayNum, creationDayNum, installDayNum))
# In[11]:
# Convert the TestPilot data.
tp_mapped = testpilot_dedup.map(convert)
tpt_mapped = testpilottest_dedup.map(convert)
# In[12]:
tp_mapped.cache()
tpt_mapped.cache()
# In[13]:
import numpy as np
# Get the median difference between submission date and creation date.
def get_median_latency(items):
latencies = [ sd - cd for (i, sd, cd, x) in items if cd is not None ]
return np.median(np.array(latencies))
# Get the latency of the earliest ping we saw after install date.
def get_earliest_submission_latency(items):
early = None
early_sd = None
for item in items:
# Skip missing install dates (though what's up with these?)
if item[3] is None:
continue
# We found an earlier submission (or the first one)
if early_sd is None or early_sd > item[1]:
early_sd = item[1]
early = item[1] - item[3]
# We found a lower-latency item from the same submission date
elif early_sd == item[1]:
if item[1] - item[3] < early:
early = item[1] - item[3]
# Else it's a later date or a higher-latency submission.
return early
tp_median_cdate_latency = tp_mapped.groupByKey().mapValues(get_median_latency)
tpt_median_cdate_latency = tpt_mapped.groupByKey().mapValues(get_median_latency)
#tp_median_idate_latency = tp_mapped.groupByKey().mapValues(get_earliest_submission_latency)
# In[14]:
tp_latencies = tp_median_cdate_latency.collect()
tpt_latencies = tpt_median_cdate_latency.collect()
# In[15]:
# Convert main pings
def convert_main(r):
dm = broadcast_dm.value
creationDayNum = None
if r.subsession_start_date is not None:
cd = r.subsession_start_date[0:10]
if cd in dm:
creationDayNum = dm[cd]
submissionDayNum = dm[r.submission_date]
# Add a dummy tuple item for install date so we can reuse the get_median_latency function
return (r.client_id, (r.client_id, submissionDayNum, creationDayNum, None))
# In[16]:
main_median_cdate_latency = main_sample_dedup.map(convert_main).groupByKey().mapValues(get_median_latency)
main_latencies = main_median_cdate_latency.collect()
# ### Compare TestPilot clients with the background latency
#
# Do the same for the set of clientids who submitted any `testpilot` or `testpilottest` pings as well as at least one `main` ping.
# In[17]:
from pyspark.sql import Row
from pyspark.sql.types import *
# Single field for client_id
schema = StructType([StructField("client_id", StringType(), True)])
tp_clients = sqlContext.createDataFrame(testpilot_dedup.map(lambda p: Row(p['clientId'])).distinct(), schema)
tpt_clients = sqlContext.createDataFrame(testpilottest_dedup.map(lambda p: Row(p['clientId'])).distinct(), schema)
all_tp_clients = tp_clients.unionAll(tpt_clients).distinct()
# In[18]:
# Join against the un-sampled dataset to get as many client_ids as possible.
# Remove duplicates after doing the join.
main_tp_clients = dedupe_df(main_summaries.join(all_tp_clients, 'client_id', 'inner'), dupe_field="document_id")
#main_tp_clients.first()
# In[19]:
main_tp_clients.cache()
# In[20]:
tp_main_median_cdate_latency = main_tp_clients.map(convert_main).groupByKey().mapValues(get_median_latency)
tp_main_latencies = tp_main_median_cdate_latency.collect()
# ### Plot the results
#
# Compare the resulting latencies with a box plot.
# In[21]:
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import plotly.plotly as py
get_ipython().magic(u'pylab inline')
serieses = [("testpilot", tp_latencies),
("testpilottest", tpt_latencies),
("main (background)", main_latencies),
("main (testpilot users)", tp_main_latencies)]
frame = pd.DataFrame({x: pd.Series(list([i[1] for i in y])) for x, y in serieses})
plt.figure(figsize=(17, 7))
frame.boxplot(return_type="axes")
plt.ylabel("latency (days)")
plt.show()
# ### Compare the distributions
#
# Use a Mann-Whitnet U-Test to see if the latencies are significantly different.
# In[22]:
# Check how many clientids were in our sample data set.
#len(main_latencies)
# In[23]:
from scipy import stats
import random
m = [i[1] for i in main_latencies]
tpm = [i[1] for i in tp_main_latencies]
for i in range(10):
some_main_latencies = random.sample(m, 1000000)
print stats.mannwhitneyu(some_main_latencies, tpm, alternative='greater')
# In[24]:
import csv
WRITE_CSV = False
if WRITE_CSV:
with open("testpilot_latency.csv", "wb") as f:
w = csv.writer(f, quoting=csv.QUOTE_MINIMAL)
w.writerows(tp_latencies)
with open("testpilottest_latency.csv", "wb") as f:
w = csv.writer(f, quoting=csv.QUOTE_MINIMAL)
w.writerows(tpt_latencies)
with open("main_latency.csv", "wb") as f:
w = csv.writer(f, quoting=csv.QUOTE_MINIMAL)
w.writerows(main_latencies)
with open("tp_main_latency.csv", "wb") as f:
w = csv.writer(f, quoting=csv.QUOTE_MINIMAL)
w.writerows(tp_main_latencies)
# ### Presence / Absence
#
# Of the clientIds we observed, how many are common to the three main datasets we've been looking at?
# In[25]:
# Single datasets:
main_client_ids = main_tp_clients.select('client_id').distinct()
print "testpilot: {}, testpilottest: {}, main: {}".format(tp_clients.count(), tpt_clients.count(), main_client_ids.count())
tp_tpt_clients = tp_clients.intersect(tpt_clients)
print "Common clients between testpilot and testpilottest: {}".format(tp_tpt_clients.count())
main_tpt_clients = main_client_ids.intersect(tpt_clients)
print "Common clients between main and testpilottest: {}".format(main_tpt_clients.count())
main_tp_clients = main_client_ids.intersect(tp_clients)
print "Common clients between main and testpilot: {}".format(main_tp_clients.count())
main_only = main_client_ids.subtract(tp_clients).subtract(tpt_clients)
tp_only = tp_clients.subtract(main_client_ids).subtract(tpt_clients)
tpt_only = tpt_clients.subtract(tp_clients).subtract(main_client_ids)
print "Clients only found in testpilot: {}, testpilottest: {}, main: {}".format(tp_only.count(), tpt_only.count(), main_only.count())
# In[26]:
tpt_not_main = tpt_clients.subtract(main_client_ids)
tpt_not_tp = tpt_clients.subtract(tp_clients)
print "Clients found in testpilottest but not main: {}, not testpilot: {}".format(tpt_not_main.count(), tpt_not_tp.count())
# ### Install to Initial Report latency
#
# Let's have a look at the typical latency between Test Pilot installation and reporting the first data point.
# In[27]:
tp_median_idate_latency = tp_mapped.groupByKey().mapValues(get_earliest_submission_latency)
tp_idate_latencies = tp_median_idate_latency.collect()
# In[28]:
bucketed = [ v[1] for v in tp_idate_latencies if v[1] is not None and v[1] <= 20 ]
overflow = len([ v for v in tp_idate_latencies if v[1] is not None and v[1] > 20 ])
missing = len([v for v in tp_idate_latencies if v[1] is None])
print "Dataset contained {} latencies > 20 days, and {} missing out of {} total.".format(overflow, missing, len(tp_idate_latencies))
plt.figure(figsize=(17, 7))
plt.hist(bucketed, bins=range(21))
plt.title("Latency from TP Install")
plt.xlabel("Latency (days)")
plt.ylabel("Count")
plt.show()
# ### Do the 'main' pings appear to have TestPilot installed?
#
# TODO: Of the `testpilottest` clientids found in `main` but not `testpilot`, how many of them appear to have the `@testpilot-addon` addon installed?
#
# We would need to filter the raw Telemetry data for this, which would take a long time.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment