-
-
Save ilanasegall/ea911ae8fbfe2d708d7cb21f454d0cc1 to your computer and use it in GitHub Desktop.
TxP - Mau Dau
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
# In[105]: | |
from pyspark.sql.functions import col | |
from requests import get as fetch | |
from datetime import datetime as dt, timedelta | |
from pyspark.sql.functions import * | |
from moztelemetry import standards | |
from pyspark.sql import Row | |
from collections import OrderedDict | |
import __builtin__ | |
py_max = __builtin__.max | |
get_ipython().magic(u'pylab inline') | |
# In[106]: | |
sc.defaultParallelism | |
# ### Now using data from addons parquet datastore! | |
# In[107]: | |
from pyspark.sql import SQLContext | |
from pyspark.sql.types import * | |
bucket = "telemetry-parquet" | |
prefix2 = "addons/v2" | |
get_ipython().magic(u'time dataset = sqlContext.read.load("s3://{}/{}".format(bucket, prefix2), "parquet").drop("subsession_start_date")') | |
# In[108]: | |
dataset.filter(dataset.submission_date_s3=="20161221").count() | |
# ### Helper functions | |
# In[109]: | |
def fmt(the_date, date_format="%Y%m%d"): | |
return dt.strftime(the_date, date_format) | |
def should_be_updated(target_day, test, recalc_window): | |
if fmt(yesterday - timedelta(recalc_window)) >= target_day: | |
return False | |
elif (test != "@testpilot-addon") and (test_data[test] > target_day): #strcompare, but who cares? | |
return False | |
return True | |
#http://stackoverflow.com/questions/37584077/convert-a-standard-python-key-value-dictionary-list-to-pyspark-data-frame | |
def convert_to_row(d): | |
return Row(**OrderedDict(sorted(d.items()))) | |
def dau(df, target_date): | |
df = df.filter(df.submission_date_s3 == target_date) | |
return df.select("client_id").distinct().count() | |
def mau(df, target_date, past_days=28, date_format="%Y%m%d"): | |
target_day_date = dt.strptime(target_date, date_format) | |
min_submission_date = target_day_date - timedelta(past_days) | |
min_submission_date = dt.strftime(min_submission_date, date_format) | |
min_submission_date = py_max(min_submission_date, test_data[test]) #date strcmp works now. fix to ensure | |
max_submission_date = target_day_date | |
max_submission_date = dt.strftime(max_submission_date, date_format) | |
df = df.filter(df.submission_date_s3 >= min_submission_date). filter(df.submission_date_s3 <= max_submission_date) | |
return df.select("client_id").distinct().count() | |
#for error-checking pings that come in after the submission_date | |
def record_tp_daily_count(df, target_date): | |
df = df.filter(df.submission_date_s3 == target_date) | |
s3_output = "s3n://net-mozaws-prod-us-west-2-pipeline-analysis/isegall/txp_dau_clients/v1/collection_date="+fmt(dt.utcnow()) | |
df.select("client_id", "submission_date_s3").distinct().repartition(10).write.format("parquet").mode("overwrite").save(s3_output) | |
# ### Retrieve current datasets | |
# In[110]: | |
r = fetch('https://testpilot.firefox.com/api/experiments.json') | |
# time math messy. fix later if important | |
test_data = {el['addon_id']: fmt(dt.strptime(el['created'][:10],"%Y-%m-%d")) for el in dict(r.json())['results']} | |
active_tests = test_data.keys() | |
# In[111]: | |
recalc_window=30 | |
# In[120]: | |
today = dt.utcnow() - timedelta(1) | |
yesterday = today - timedelta(1) #latest day we're retrieving data for | |
start_date = fmt(yesterday - timedelta(recalc_window)) | |
end_date = fmt(yesterday) | |
print today | |
# In[121]: | |
dataset.printSchema() | |
# In[122]: | |
dataset = dataset.filter(dataset.submission_date_s3 >= start_date). filter(dataset.submission_date_s3 <= end_date) | |
# In[123]: | |
#only looking at txp addons; optimization for speed | |
dataset = dataset.where(col("addon_id").isin(active_tests + ["@testpilot-addon"])) | |
# In[124]: | |
tp_installed = dataset.filter(dataset.addon_id == "@testpilot-addon").select("client_id", "addon_id").distinct() | |
test_installed = dataset.where(col("addon_id").isin(active_tests)).select("client_id", "addon_id", "submission_date_s3").distinct() | |
active_tp_users = tp_installed.join(test_installed, test_installed.client_id == tp_installed.client_id).drop(tp_installed.client_id).drop(tp_installed.addon_id) | |
active_tp_users.cache() | |
# In[125]: | |
prev_dataframe = sqlContext.read.parquet("s3://net-mozaws-prod-us-west-2-pipeline-analysis/isegall/txp_mau_dau_daily/v2/collection_date=" + fmt(yesterday)) | |
# ### Munging and calculating | |
# In[126]: | |
first_date = dt.strptime(start_date, "%Y%m%d") | |
date_format="%Y%m%d" | |
records = [] | |
# target_day = fmt(yesterday) | |
target_day = fmt(yesterday) | |
for test in active_tests: | |
if should_be_updated(target_day, test, recalc_window): #check if test is active in should_be_updated | |
print "We should update data for {t} on {d}".format(t=test, d=target_day) | |
record = {"test": test, "day": target_day, "generated_on": fmt(today, date_format)} | |
record["dau"] = dau(active_tp_users.filter(active_tp_users.addon_id == '%s' % test), target_day) | |
print "dau: ", record["dau"] | |
record["mau"] = record["mau"] = mau(active_tp_users.filter(active_tp_users.addon_id == '%s' % test), target_day) | |
print "mau: ", record["mau"] | |
records.append(record) | |
print "We should update data for Test Pilot on {d}".format(t=test, d=target_day) | |
record = {"test": "testpilot", "day": target_day, "generated_on": fmt(today, date_format)} | |
record["dau"] = dau(active_tp_users, target_day) | |
print "dau: ", record["dau"] | |
record["mau"] = mau(active_tp_users, target_day) | |
print "mau: ", record["mau"] | |
records.append(record) | |
# In[127]: | |
#append previous data | |
#right now, assume we have the previous day's data. update later to calc for anything missing | |
records.extend(prev_dataframe.rdd.map(lambda x: x.asDict()).collect()) | |
# In[128]: | |
records | |
# In[130]: | |
df_updated = sc.parallelize(records).map(convert_to_row).toDF() | |
s3_output = "s3n://net-mozaws-prod-us-west-2-pipeline-analysis/isegall/txp_mau_dau_daily/v2/collection_date="+fmt(today) | |
df_updated.repartition(10).write.format("parquet").mode("overwrite").save(s3_output) | |
# In[ ]: | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment