Skip to content

Instantly share code, notes, and snippets.

@ilanasegall
Last active September 9, 2016 19:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ilanasegall/175314b06a31fb2af776742828a69e16 to your computer and use it in GitHub Desktop.
Save ilanasegall/175314b06a31fb2af776742828a69e16 to your computer and use it in GitHub Desktop.
TxP - Mau Dau main_summary
Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
# coding: utf-8
# In[2]:
import boto3, botocore
from requests import get as fetch
from pprint import pprint
from collections import defaultdict, namedtuple
from pyspark.sql import Row
import csv, os.path, json
from datetime import datetime as dt, timedelta, date
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from boto3.s3.transfer import S3Transfer
from moztelemetry import get_pings, get_pings_properties, standards
get_ipython().magic(u'pylab inline')
# In[3]:
sc.defaultParallelism
# In[4]:
## When we have stuff in main_summary...
# from pyspark.sql import SQLContext
# from pyspark.sql.types import *
# bucket = "telemetry-parquet"
# prefix = "main_summary/v3"
# %time dataset = sqlContext.read.load("s3://{}/{}".format(bucket, prefix), "parquet")
# ### Helper functions
# In[52]:
def fmt(the_date, date_format="%Y%m%d"):
return dt.strftime(the_date, date_format)
def should_be_updated(target_day, test, recalc_window):
if fmt(yesterday - timedelta(recalc_window)) >= target_day:
return False
elif (test != "@testpilot-addon") and (test_data[test] > target_day): #strcompare, but who cares?
return False
return True
# ### Retrieve current datasets
# In[39]:
r = fetch('https://testpilot.firefox.com/api/experiments?format=json')
# time math messy. fix later if important
test_data = {el['addon_id']: fmt(dt.strptime(el['created'][:10],"%Y-%m-%d")) for el in dict(r.json())['results']}
# test_data["@testpilot-addon"] = fmt(dt.strptime("20160311","%Y%m%d"))
active_tests = test_data.keys()
# In[ ]:
### FOR TESTING
test_data = {"jid1-F9UJ2thwoAm5gQ@jetpack": "20160903"}
active_tests = test_data.keys()
###
# In[5]:
## wait for main_summary...
# %time dataset = dataset.filter(dataset.submission_date_s3 >= '20160704').filter(dataset.submission_date_s3 <= '20160706')
# In[7]:
yesterday = dt.utcnow() - timedelta(1) #latest day we're retrieving data for
# In[40]:
recalc_window=1
# In[41]:
start_date = fmt(yesterday - timedelta(recalc_window-1))
end_date = fmt(yesterday)
frac = 1
pings = get_pings(sc,
app="Firefox",
channel="nightly",
submission_date=(start_date, end_date),
fraction=frac).union(
get_pings(sc,
app="Firefox",
channel="aurora",
submission_date=(start_date, end_date),
fraction=frac)).union(
get_pings(sc,
app="Firefox",
channel="beta",
submission_date=(start_date, end_date),
fraction=frac)).union(
get_pings(sc,
app="Firefox",
channel="release",
submission_date=(start_date, end_date),
fraction=frac))
# In[42]:
# prev_dataframe = sqlContext.read.parquet("s3n://net-mozaws-prod-us-west-2-pipeline-analysis/isegall/fxa_mau_dau_daily/v1/collection_date="+(last date))
# ### Munging and calculating
# In[43]:
subset = get_pings_properties(pings, ["clientId", "meta/submissionDate", "environment/addons"])
# In[55]:
def create_rows(ps):
k,v = ps
rows = []
#tried be be pythonic, but Nonetypes bit me in the ass
# addons = list(set().union(*[e.get("environment/addons",{}).get("activeAddons",{}).keys() for e in v]))
addons = set()
for e in v:
if e.get("environment/addons",{}) is None:
continue
addons = addons.union(e.get("environment/addons",{}).get("activeAddons",{}).keys())
if len(set(active_tests).intersection(set(addons))) >=1:
d = {
'clientId': k[0],
'submission_date_s3': k[1],
'activityTimestamp': standards.unix_time_nanos(dt.strptime(k[1], "%Y%m%d")), #do we need both??
'test': "@testpilot-addon"
}
rows.append(Row(**d))
for t in active_tests:
if t in addons:
d = {
'clientId': k[0],
'submission_date_s3': k[1],
'activityTimestamp': standards.unix_time_nanos(dt.strptime(k[1], "%Y%m%d")), #do we need both??
'test': t
}
rows.append(Row(**d))
return rows
# In[56]:
rows = subset.map(lambda x: ((x["clientId"], x["meta/submissionDate"]), [x])). reduceByKey(lambda x,y: x+y). flatMap(create_rows)
client_dataset = sqlContext.createDataFrame(rows)
# In[57]:
#TODO: update based on first appearance
first_date = dt.strptime("20160311", "%Y%m%d") #first txp info recorded
date_format="%Y%m%d"
records = []
# for test in active_tests:
for test in ["@testpilot-addon"]:
target_dt = yesterday #always calc previous day
while target_dt >= first_date: #check if test is active in should_be_updated
target_day = fmt(target_dt)
if should_be_updated(target_day, test, recalc_window):
print "We should update data for {t} on {d}".format(t=test, d=target_day)
record = {"test": test, "day": target_day, "generated_on": fmt(dt.utcnow(), date_format)}
record["dau"] = standards.dau(client_dataset.filter(client_dataset.test == '%s' % test), target_day)
print "dau: ", record["dau"]
record["mau"] = standards.mau(client_dataset.filter(client_dataset.test == '%s' % test), target_day)
print "mau: ", record["mau"]
records.append(record)
target_dt -= timedelta(1)
#append previous data
# records.extend(prev_dataframe.filter(prev_dataframe.day <= fmt(yesterday - timedelta(recalc_window)))\
# .map(lambda x: x.asDict()).collect())
# In[ ]:
# In[ ]:
# In[ ]:
# In[ ]:
###sanity check###
# In[8]:
start_date = fmt(yesterday)
end_date = fmt(yesterday)
frac = 1
pings = get_pings(sc,
app="Firefox",
channel="nightly",
submission_date=(start_date, end_date),
fraction=frac).union(
get_pings(sc,
app="Firefox",
channel="aurora",
submission_date=(start_date, end_date),
fraction=frac)).union(
get_pings(sc,
app="Firefox",
channel="beta",
submission_date=(start_date, end_date),
fraction=frac)).union(
get_pings(sc,
app="Firefox",
channel="release",
submission_date=(start_date, end_date),
fraction=frac))
# In[34]:
r = fetch('https://testpilot.firefox.com/api/experiments?format=json')
# time math messy. fix later if important
test_data = {el['addon_id']: fmt(dt.strptime(el['created'][:10],"%Y-%m-%d")) for el in dict(r.json())['results']}
test_data.keys()
def test_installed(p):
uid, lst = p
if len(set(test_data.keys()).intersection(set(lst))) >=1:
return True
return False
# In[38]:
pings.filter(lambda x: "@testpilot-addon" in x.get("environment", {}).get("addons",{}).get("activeAddons",{}).keys()) .map(lambda x: (x.get("clientId"), x.get("environment", {}).get("addons",{}).get("activeAddons",{}).keys())) .filter(test_installed) .map(lambda x:x[0]).distinct().count()
# In[ ]:
# In[ ]:
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment