Skip to content

Instantly share code, notes, and snippets.

@mnoorenberghe
Created September 19, 2017 08:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mnoorenberghe/938da8c64dfa3a2ce7e42a1ebe14669b to your computer and use it in GitHub Desktop.
Save mnoorenberghe/938da8c64dfa3a2ce7e42a1ebe14669b to your computer and use it in GitHub Desktop.
Bug1398930
Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
# coding: utf-8
---
title: "Bug 1398930"
authors:
- mnoorenberghe
tags:
- telemetry
- spark
created_at: 2017-09-18
updated_at: 2017-09-18
tldr: Migration
---
# ### Bug 1398930
# https://bugzilla.mozilla.org/show_bug.cgi?id=1398930
# In[1]:
import ujson as json
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import plotly.plotly as py
from plotly.graph_objs import *
from moztelemetry import get_pings_properties, get_one_ping_per_client
from moztelemetry.dataset import Dataset
from operator import add
get_ipython().magic(u'matplotlib inline')
# ### Basics
# The goal of this example is to plot the startup distribution for each OS. Let's see how many parallel workers we have at our disposal:
# In[2]:
sc.cancelAllJobs()
sc.defaultParallelism
# Let's fetch 10% of Telemetry submissions for a given submission date...
# In[3]:
Dataset.from_source("telemetry").schema
# In[4]:
pings = Dataset.from_source("telemetry") .where(docType="main") .where(appUpdateChannel="release") .where(appVersion="55.0.3") .records(sc, sample=0.04)
# ... and extract only the attributes we need from the Telemetry submissions:
# In[5]:
subset = get_pings_properties(pings, [#"clientId",
"environment/system/os/name",
"payload/keyedHistograms/FX_MIGRATION_HISTORY_IMPORT_MS/chrome",
"payload/keyedHistograms/FX_MIGRATION_USAGE/chrome"])
# Let's filter out submissions with an invalid startup time:
# In[6]:
subset = subset.filter(lambda p: "payload/keyedHistograms/FX_MIGRATION_USAGE/chrome" in p and
p["payload/keyedHistograms/FX_MIGRATION_USAGE/chrome"] is not None)
# To prevent pseudoreplication, let's consider only a single submission for each client. As this step requires a distributed shuffle, it should always be run only after extracting the attributes of interest with *get_pings_properties*.
# In[7]:
#subset = get_one_ping_per_client(subset)
# Caching is fundamental as it allows for an iterative, real-time development workflow:
# In[8]:
#subset.count()
# In[9]:
cached = subset.cache()
#cached = sc.parallelize(subset.take(10))
# In[10]:
#cached.collect()
# How many pings are we looking at?
# In[11]:
#cached.count()
# In[12]:
with_history = cached.filter(lambda p: p["payload/keyedHistograms/FX_MIGRATION_USAGE/chrome"].get(2) > 0)
with_history.count()
# In[13]:
with_history.first()
# In[14]:
with_history.first()["payload/keyedHistograms/FX_MIGRATION_HISTORY_IMPORT_MS/chrome"].nonzero()
# In[15]:
with_history.first()["payload/keyedHistograms/FX_MIGRATION_HISTORY_IMPORT_MS/chrome"].sum()
# ## TODO: handle users who do a manual import in the same session as an auto-migrate
# In[37]:
def summarize(p):
completed = p.get("payload/keyedHistograms/FX_MIGRATION_HISTORY_IMPORT_MS/chrome", None)
if type(completed) == pd.Series and not completed.empty:
completed_count = completed.sum()
else:
completed_count = 0
return {
"environment/system/os/name": p["environment/system/os/name"],
"history_started_count": p["payload/keyedHistograms/FX_MIGRATION_USAGE/chrome"].get(2),
"history_completed_count": completed_count,
}
mapped = with_history.map(summarize)
# mapped.collect()
# In[38]:
mapped.take(1)
# In[39]:
num_started = mapped.map(lambda p: p["history_started_count"] or 0.0).sum()
num_started
# In[40]:
num_completed = mapped.map(lambda p: p["history_completed_count"]).sum()
num_completed
# In[41]:
num_completed / num_started * 100
# completed includes auto but only in sessions with manual history
# started is only for manual migration
# In[43]:
def had_less_completions(p):
completed = p.get("payload/keyedHistograms/FX_MIGRATION_HISTORY_IMPORT_MS/chrome", None)
if type(completed) == pd.Series and not completed.empty:
completed_count = completed.sum()
else:
completed_count = 0
return p["payload/keyedHistograms/FX_MIGRATION_USAGE/chrome"].get(2) < completed_count,
with_history.map(had_less_completions).countByValue().items()
# In[ ]:
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment