-
-
Save ilanasegall/175314b06a31fb2af776742828a69e16 to your computer and use it in GitHub Desktop.
TxP - Mau Dau main_summary
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
# In[2]: | |
import boto3, botocore | |
from requests import get as fetch | |
from pprint import pprint | |
from collections import defaultdict, namedtuple | |
from pyspark.sql import Row | |
import csv, os.path, json | |
from datetime import datetime as dt, timedelta, date | |
from pyspark.sql import SQLContext | |
from pyspark.sql.types import * | |
from pyspark.sql.functions import * | |
from boto3.s3.transfer import S3Transfer | |
from moztelemetry import get_pings, get_pings_properties, standards | |
get_ipython().magic(u'pylab inline') | |
# In[3]: | |
sc.defaultParallelism | |
# In[4]: | |
## When we have stuff in main_summary... | |
# from pyspark.sql import SQLContext | |
# from pyspark.sql.types import * | |
# bucket = "telemetry-parquet" | |
# prefix = "main_summary/v3" | |
# %time dataset = sqlContext.read.load("s3://{}/{}".format(bucket, prefix), "parquet") | |
# ### Helper functions | |
# In[52]: | |
def fmt(the_date, date_format="%Y%m%d"): | |
return dt.strftime(the_date, date_format) | |
def should_be_updated(target_day, test, recalc_window): | |
if fmt(yesterday - timedelta(recalc_window)) >= target_day: | |
return False | |
elif (test != "@testpilot-addon") and (test_data[test] > target_day): #strcompare, but who cares? | |
return False | |
return True | |
# ### Retrieve current datasets | |
# In[39]: | |
r = fetch('https://testpilot.firefox.com/api/experiments?format=json') | |
# time math messy. fix later if important | |
test_data = {el['addon_id']: fmt(dt.strptime(el['created'][:10],"%Y-%m-%d")) for el in dict(r.json())['results']} | |
# test_data["@testpilot-addon"] = fmt(dt.strptime("20160311","%Y%m%d")) | |
active_tests = test_data.keys() | |
# In[ ]: | |
### FOR TESTING | |
test_data = {"jid1-F9UJ2thwoAm5gQ@jetpack": "20160903"} | |
active_tests = test_data.keys() | |
### | |
# In[5]: | |
## wait for main_summary... | |
# %time dataset = dataset.filter(dataset.submission_date_s3 >= '20160704').filter(dataset.submission_date_s3 <= '20160706') | |
# In[7]: | |
yesterday = dt.utcnow() - timedelta(1) #latest day we're retrieving data for | |
# In[40]: | |
recalc_window=1 | |
# In[41]: | |
start_date = fmt(yesterday - timedelta(recalc_window-1)) | |
end_date = fmt(yesterday) | |
frac = 1 | |
pings = get_pings(sc, | |
app="Firefox", | |
channel="nightly", | |
submission_date=(start_date, end_date), | |
fraction=frac).union( | |
get_pings(sc, | |
app="Firefox", | |
channel="aurora", | |
submission_date=(start_date, end_date), | |
fraction=frac)).union( | |
get_pings(sc, | |
app="Firefox", | |
channel="beta", | |
submission_date=(start_date, end_date), | |
fraction=frac)).union( | |
get_pings(sc, | |
app="Firefox", | |
channel="release", | |
submission_date=(start_date, end_date), | |
fraction=frac)) | |
# In[42]: | |
# prev_dataframe = sqlContext.read.parquet("s3n://net-mozaws-prod-us-west-2-pipeline-analysis/isegall/fxa_mau_dau_daily/v1/collection_date="+(last date)) | |
# ### Munging and calculating | |
# In[43]: | |
subset = get_pings_properties(pings, ["clientId", "meta/submissionDate", "environment/addons"]) | |
# In[55]: | |
def create_rows(ps): | |
k,v = ps | |
rows = [] | |
#tried be be pythonic, but Nonetypes bit me in the ass | |
# addons = list(set().union(*[e.get("environment/addons",{}).get("activeAddons",{}).keys() for e in v])) | |
addons = set() | |
for e in v: | |
if e.get("environment/addons",{}) is None: | |
continue | |
addons = addons.union(e.get("environment/addons",{}).get("activeAddons",{}).keys()) | |
if len(set(active_tests).intersection(set(addons))) >=1: | |
d = { | |
'clientId': k[0], | |
'submission_date_s3': k[1], | |
'activityTimestamp': standards.unix_time_nanos(dt.strptime(k[1], "%Y%m%d")), #do we need both?? | |
'test': "@testpilot-addon" | |
} | |
rows.append(Row(**d)) | |
for t in active_tests: | |
if t in addons: | |
d = { | |
'clientId': k[0], | |
'submission_date_s3': k[1], | |
'activityTimestamp': standards.unix_time_nanos(dt.strptime(k[1], "%Y%m%d")), #do we need both?? | |
'test': t | |
} | |
rows.append(Row(**d)) | |
return rows | |
# In[56]: | |
rows = subset.map(lambda x: ((x["clientId"], x["meta/submissionDate"]), [x])). reduceByKey(lambda x,y: x+y). flatMap(create_rows) | |
client_dataset = sqlContext.createDataFrame(rows) | |
# In[57]: | |
#TODO: update based on first appearance | |
first_date = dt.strptime("20160311", "%Y%m%d") #first txp info recorded | |
date_format="%Y%m%d" | |
records = [] | |
# for test in active_tests: | |
for test in ["@testpilot-addon"]: | |
target_dt = yesterday #always calc previous day | |
while target_dt >= first_date: #check if test is active in should_be_updated | |
target_day = fmt(target_dt) | |
if should_be_updated(target_day, test, recalc_window): | |
print "We should update data for {t} on {d}".format(t=test, d=target_day) | |
record = {"test": test, "day": target_day, "generated_on": fmt(dt.utcnow(), date_format)} | |
record["dau"] = standards.dau(client_dataset.filter(client_dataset.test == '%s' % test), target_day) | |
print "dau: ", record["dau"] | |
record["mau"] = standards.mau(client_dataset.filter(client_dataset.test == '%s' % test), target_day) | |
print "mau: ", record["mau"] | |
records.append(record) | |
target_dt -= timedelta(1) | |
#append previous data | |
# records.extend(prev_dataframe.filter(prev_dataframe.day <= fmt(yesterday - timedelta(recalc_window)))\ | |
# .map(lambda x: x.asDict()).collect()) | |
# In[ ]: | |
# In[ ]: | |
# In[ ]: | |
# In[ ]: | |
###sanity check### | |
# In[8]: | |
start_date = fmt(yesterday) | |
end_date = fmt(yesterday) | |
frac = 1 | |
pings = get_pings(sc, | |
app="Firefox", | |
channel="nightly", | |
submission_date=(start_date, end_date), | |
fraction=frac).union( | |
get_pings(sc, | |
app="Firefox", | |
channel="aurora", | |
submission_date=(start_date, end_date), | |
fraction=frac)).union( | |
get_pings(sc, | |
app="Firefox", | |
channel="beta", | |
submission_date=(start_date, end_date), | |
fraction=frac)).union( | |
get_pings(sc, | |
app="Firefox", | |
channel="release", | |
submission_date=(start_date, end_date), | |
fraction=frac)) | |
# In[34]: | |
r = fetch('https://testpilot.firefox.com/api/experiments?format=json') | |
# time math messy. fix later if important | |
test_data = {el['addon_id']: fmt(dt.strptime(el['created'][:10],"%Y-%m-%d")) for el in dict(r.json())['results']} | |
test_data.keys() | |
def test_installed(p): | |
uid, lst = p | |
if len(set(test_data.keys()).intersection(set(lst))) >=1: | |
return True | |
return False | |
# In[38]: | |
pings.filter(lambda x: "@testpilot-addon" in x.get("environment", {}).get("addons",{}).get("activeAddons",{}).keys()) .map(lambda x: (x.get("clientId"), x.get("environment", {}).get("addons",{}).get("activeAddons",{}).keys())) .filter(test_installed) .map(lambda x:x[0]).distinct().count() | |
# In[ ]: | |
# In[ ]: | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment