-
-
Save mnoorenberghe/938da8c64dfa3a2ce7e42a1ebe14669b to your computer and use it in GitHub Desktop.
Bug1398930
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
--- | |
title: "Bug 1398930" | |
authors: | |
- mnoorenberghe | |
tags: | |
- telemetry | |
- spark | |
created_at: 2017-09-18 | |
updated_at: 2017-09-18 | |
tldr: Migration | |
--- | |
# ### Bug 1398930 | |
# https://bugzilla.mozilla.org/show_bug.cgi?id=1398930 | |
# In[1]: | |
import ujson as json | |
import matplotlib.pyplot as plt | |
import pandas as pd | |
import numpy as np | |
import plotly.plotly as py | |
from plotly.graph_objs import * | |
from moztelemetry import get_pings_properties, get_one_ping_per_client | |
from moztelemetry.dataset import Dataset | |
from operator import add | |
get_ipython().magic(u'matplotlib inline') | |
# ### Basics | |
# The goal of this example is to plot the startup distribution for each OS. Let's see how many parallel workers we have at our disposal: | |
# In[2]: | |
sc.cancelAllJobs() | |
sc.defaultParallelism | |
# Let's fetch 10% of Telemetry submissions for a given submission date... | |
# In[3]: | |
Dataset.from_source("telemetry").schema | |
# In[4]: | |
pings = Dataset.from_source("telemetry") .where(docType="main") .where(appUpdateChannel="release") .where(appVersion="55.0.3") .records(sc, sample=0.04) | |
# ... and extract only the attributes we need from the Telemetry submissions: | |
# In[5]: | |
subset = get_pings_properties(pings, [#"clientId", | |
"environment/system/os/name", | |
"payload/keyedHistograms/FX_MIGRATION_HISTORY_IMPORT_MS/chrome", | |
"payload/keyedHistograms/FX_MIGRATION_USAGE/chrome"]) | |
# Let's filter out submissions with an invalid startup time: | |
# In[6]: | |
subset = subset.filter(lambda p: "payload/keyedHistograms/FX_MIGRATION_USAGE/chrome" in p and | |
p["payload/keyedHistograms/FX_MIGRATION_USAGE/chrome"] is not None) | |
# To prevent pseudoreplication, let's consider only a single submission for each client. As this step requires a distributed shuffle, it should always be run only after extracting the attributes of interest with *get_pings_properties*. | |
# In[7]: | |
#subset = get_one_ping_per_client(subset) | |
# Caching is fundamental as it allows for an iterative, real-time development workflow: | |
# In[8]: | |
#subset.count() | |
# In[9]: | |
cached = subset.cache() | |
#cached = sc.parallelize(subset.take(10)) | |
# In[10]: | |
#cached.collect() | |
# How many pings are we looking at? | |
# In[11]: | |
#cached.count() | |
# In[12]: | |
with_history = cached.filter(lambda p: p["payload/keyedHistograms/FX_MIGRATION_USAGE/chrome"].get(2) > 0) | |
with_history.count() | |
# In[13]: | |
with_history.first() | |
# In[14]: | |
with_history.first()["payload/keyedHistograms/FX_MIGRATION_HISTORY_IMPORT_MS/chrome"].nonzero() | |
# In[15]: | |
with_history.first()["payload/keyedHistograms/FX_MIGRATION_HISTORY_IMPORT_MS/chrome"].sum() | |
# ## TODO: handle users who do a manual import in the same session as an auto-migrate | |
# In[37]: | |
def summarize(p): | |
completed = p.get("payload/keyedHistograms/FX_MIGRATION_HISTORY_IMPORT_MS/chrome", None) | |
if type(completed) == pd.Series and not completed.empty: | |
completed_count = completed.sum() | |
else: | |
completed_count = 0 | |
return { | |
"environment/system/os/name": p["environment/system/os/name"], | |
"history_started_count": p["payload/keyedHistograms/FX_MIGRATION_USAGE/chrome"].get(2), | |
"history_completed_count": completed_count, | |
} | |
mapped = with_history.map(summarize) | |
# mapped.collect() | |
# In[38]: | |
mapped.take(1) | |
# In[39]: | |
num_started = mapped.map(lambda p: p["history_started_count"] or 0.0).sum() | |
num_started | |
# In[40]: | |
num_completed = mapped.map(lambda p: p["history_completed_count"]).sum() | |
num_completed | |
# In[41]: | |
num_completed / num_started * 100 | |
# completed includes auto but only in sessions with manual history | |
# started is only for manual migration | |
# In[43]: | |
def had_less_completions(p): | |
completed = p.get("payload/keyedHistograms/FX_MIGRATION_HISTORY_IMPORT_MS/chrome", None) | |
if type(completed) == pd.Series and not completed.empty: | |
completed_count = completed.sum() | |
else: | |
completed_count = 0 | |
return p["payload/keyedHistograms/FX_MIGRATION_USAGE/chrome"].get(2) < completed_count, | |
with_history.map(had_less_completions).countByValue().items() | |
# In[ ]: | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment