Skip to content

Instantly share code, notes, and snippets.

@ilanasegall
Last active December 22, 2016 15:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ilanasegall/ea911ae8fbfe2d708d7cb21f454d0cc1 to your computer and use it in GitHub Desktop.
Save ilanasegall/ea911ae8fbfe2d708d7cb21f454d0cc1 to your computer and use it in GitHub Desktop.
TxP - Mau Dau
Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
# coding: utf-8
# In[105]:
from pyspark.sql.functions import col
from requests import get as fetch
from datetime import datetime as dt, timedelta
from pyspark.sql.functions import *
from moztelemetry import standards
from pyspark.sql import Row
from collections import OrderedDict
import __builtin__
py_max = __builtin__.max
get_ipython().magic(u'pylab inline')
# In[106]:
sc.defaultParallelism
# ### Now using data from addons parquet datastore!
# In[107]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
bucket = "telemetry-parquet"
prefix2 = "addons/v2"
get_ipython().magic(u'time dataset = sqlContext.read.load("s3://{}/{}".format(bucket, prefix2), "parquet").drop("subsession_start_date")')
# In[108]:
dataset.filter(dataset.submission_date_s3=="20161221").count()
# ### Helper functions
# In[109]:
def fmt(the_date, date_format="%Y%m%d"):
return dt.strftime(the_date, date_format)
def should_be_updated(target_day, test, recalc_window):
if fmt(yesterday - timedelta(recalc_window)) >= target_day:
return False
elif (test != "@testpilot-addon") and (test_data[test] > target_day): #strcompare, but who cares?
return False
return True
#http://stackoverflow.com/questions/37584077/convert-a-standard-python-key-value-dictionary-list-to-pyspark-data-frame
def convert_to_row(d):
return Row(**OrderedDict(sorted(d.items())))
def dau(df, target_date):
df = df.filter(df.submission_date_s3 == target_date)
return df.select("client_id").distinct().count()
def mau(df, target_date, past_days=28, date_format="%Y%m%d"):
target_day_date = dt.strptime(target_date, date_format)
min_submission_date = target_day_date - timedelta(past_days)
min_submission_date = dt.strftime(min_submission_date, date_format)
min_submission_date = py_max(min_submission_date, test_data[test]) #date strcmp works now. fix to ensure
max_submission_date = target_day_date
max_submission_date = dt.strftime(max_submission_date, date_format)
df = df.filter(df.submission_date_s3 >= min_submission_date). filter(df.submission_date_s3 <= max_submission_date)
return df.select("client_id").distinct().count()
#for error-checking pings that come in after the submission_date
def record_tp_daily_count(df, target_date):
df = df.filter(df.submission_date_s3 == target_date)
s3_output = "s3n://net-mozaws-prod-us-west-2-pipeline-analysis/isegall/txp_dau_clients/v1/collection_date="+fmt(dt.utcnow())
df.select("client_id", "submission_date_s3").distinct().repartition(10).write.format("parquet").mode("overwrite").save(s3_output)
# ### Retrieve current datasets
# In[110]:
r = fetch('https://testpilot.firefox.com/api/experiments.json')
# time math messy. fix later if important
test_data = {el['addon_id']: fmt(dt.strptime(el['created'][:10],"%Y-%m-%d")) for el in dict(r.json())['results']}
active_tests = test_data.keys()
# In[111]:
recalc_window=30
# In[120]:
today = dt.utcnow() - timedelta(1)
yesterday = today - timedelta(1) #latest day we're retrieving data for
start_date = fmt(yesterday - timedelta(recalc_window))
end_date = fmt(yesterday)
print today
# In[121]:
dataset.printSchema()
# In[122]:
dataset = dataset.filter(dataset.submission_date_s3 >= start_date). filter(dataset.submission_date_s3 <= end_date)
# In[123]:
#only looking at txp addons; optimization for speed
dataset = dataset.where(col("addon_id").isin(active_tests + ["@testpilot-addon"]))
# In[124]:
tp_installed = dataset.filter(dataset.addon_id == "@testpilot-addon").select("client_id", "addon_id").distinct()
test_installed = dataset.where(col("addon_id").isin(active_tests)).select("client_id", "addon_id", "submission_date_s3").distinct()
active_tp_users = tp_installed.join(test_installed, test_installed.client_id == tp_installed.client_id).drop(tp_installed.client_id).drop(tp_installed.addon_id)
active_tp_users.cache()
# In[125]:
prev_dataframe = sqlContext.read.parquet("s3://net-mozaws-prod-us-west-2-pipeline-analysis/isegall/txp_mau_dau_daily/v2/collection_date=" + fmt(yesterday))
# ### Munging and calculating
# In[126]:
first_date = dt.strptime(start_date, "%Y%m%d")
date_format="%Y%m%d"
records = []
# target_day = fmt(yesterday)
target_day = fmt(yesterday)
for test in active_tests:
if should_be_updated(target_day, test, recalc_window): #check if test is active in should_be_updated
print "We should update data for {t} on {d}".format(t=test, d=target_day)
record = {"test": test, "day": target_day, "generated_on": fmt(today, date_format)}
record["dau"] = dau(active_tp_users.filter(active_tp_users.addon_id == '%s' % test), target_day)
print "dau: ", record["dau"]
record["mau"] = record["mau"] = mau(active_tp_users.filter(active_tp_users.addon_id == '%s' % test), target_day)
print "mau: ", record["mau"]
records.append(record)
print "We should update data for Test Pilot on {d}".format(t=test, d=target_day)
record = {"test": "testpilot", "day": target_day, "generated_on": fmt(today, date_format)}
record["dau"] = dau(active_tp_users, target_day)
print "dau: ", record["dau"]
record["mau"] = mau(active_tp_users, target_day)
print "mau: ", record["mau"]
records.append(record)
# In[127]:
#append previous data
#right now, assume we have the previous day's data. update later to calc for anything missing
records.extend(prev_dataframe.rdd.map(lambda x: x.asDict()).collect())
# In[128]:
records
# In[130]:
df_updated = sc.parallelize(records).map(convert_to_row).toDF()
s3_output = "s3n://net-mozaws-prod-us-west-2-pipeline-analysis/isegall/txp_mau_dau_daily/v2/collection_date="+fmt(today)
df_updated.repartition(10).write.format("parquet").mode("overwrite").save(s3_output)
# In[ ]:
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment