Skip to content

Instantly share code, notes, and snippets.

@mreid-moz
Created March 21, 2016 14:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mreid-moz/8b2c2b1c6594d658ca5e to your computer and use it in GitHub Desktop.
Save mreid-moz/8b2c2b1c6594d658ca5e to your computer and use it in GitHub Desktop.
MauDau
Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
# coding: utf-8
# # Overall Firefox Engagement Ratio
#
# Compute the Engagement Ratio for the overall Firefox population as described in [Bug 1240849](https://bugzilla.mozilla.org/show_bug.cgi?id=1240849). The resulting data is shown on the [Firefox Dashboard](http://metrics.services.mozilla.com/firefox-dashboard/), and the more granular MAU and DAU values can be viewed via the [Diagnostic Data Viewer](https://metrics.services.mozilla.com/diagnostic-data-viewer).
#
# The actual Daily Active Users (DAU) and Monthly Active Users (MAU) computations are defined in [standards.py](https://github.com/mozilla/python_moztelemetry/blob/master/moztelemetry/standards.py) in the [python_moztelemetry](https://github.com/mozilla/python_moztelemetry) repo.
# In[1]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from datetime import datetime as _datetime, timedelta, date
import boto3
import botocore
import csv
import os.path
from moztelemetry.standards import dau, mau
bucket = "telemetry-parquet"
prefix = "executive_stream/v3"
get_ipython().magic(u'time dataset = sqlContext.read.load("s3://{}/{}".format(bucket, prefix), "parquet")')
# How many cores are we running on?
# In[2]:
sc.defaultParallelism
# And what do the underlying records look like?
# In[3]:
dataset.printSchema()
# We want to incrementally update the data, re-computing any values that are missing or for which data is still arriving. Define that logic here.
# In[4]:
def fmt(the_date, date_format="%Y%m%d"):
return _datetime.strftime(the_date, date_format)
# Our calculations look for activity date reported within
# a certain time window. If that window has passed, we do
# not need to re-compute data for that period.
def should_be_updated(record,
target_col="day",
generated_col="generated_on",
date_format="%Y%m%d"):
target = _datetime.strptime(record[target_col], date_format)
generated = _datetime.strptime(record[generated_col], date_format)
# Don't regenerate data that was already updated today.
today = fmt(_datetime.utcnow(), date_format)
if record[generated_col] >= today:
return False
diff = generated - target
return diff.days <= 10
# Identify all missing days, or days that have not yet passed
# the "still reporting in" threshold (as of 2016-03-17, that is
# defined as 10 days).
def update_engagement_csv(dataset, old_filename, new_filename,
cutoff_days=30, date_format="%Y%m%d"):
cutoff_date = _datetime.utcnow() - timedelta(cutoff_days)
cutoff = fmt(cutoff_date, date_format)
print "Cutoff date: {}".format(cutoff)
fields = ["day", "dau", "mau", "generated_on"]
should_write_header = True
potential_updates = {}
# Carry over rows we won't touch
if os.path.exists(old_filename):
with open(old_filename) as csv_old:
reader = csv.DictReader(csv_old)
with open(new_filename, "w") as csv_new:
writer = csv.DictWriter(csv_new, fields)
writer.writeheader()
should_write_header = False
for row in reader:
if row['day'] < cutoff:
writer.writerow(row)
else:
potential_updates[row['day']] = row
with open(new_filename, "a") as csv_new:
writer = csv.DictWriter(csv_new, fields)
if should_write_header:
writer.writeheader()
for i in range(cutoff_days, 0, -1):
target_day = fmt(_datetime.utcnow() - timedelta(i), date_format)
if target_day in potential_updates and not should_be_updated(potential_updates[target_day]):
# It's fine as-is.
writer.writerow(potential_updates[target_day])
else:
# Update it.
print "We should update data for {}".format(target_day)
record = {"day": target_day, "generated_on": fmt(_datetime.utcnow(), date_format)}
print "Starting dau {} at {}".format(target_day, _datetime.utcnow())
record["dau"] = dau(dataset, target_day)
print "Finished dau {} at {}".format(target_day, _datetime.utcnow())
print "Starting mau {} at {}".format(target_day, _datetime.utcnow())
record["mau"] = mau(dataset, target_day)
print "Finished mau {} at {}".format(target_day, _datetime.utcnow())
writer.writerow(record)
# ### Fetch existing data from S3
# Attempt to fetch an existing data file from S3. If found, update it incrementally. Otherwise, re-compute the entire dataset.
# In[5]:
from boto3.s3.transfer import S3Transfer
data_bucket = "net-mozaws-prod-us-west-2-pipeline-analysis"
engagement_basename = "engagement_ratio.csv"
new_engagement_basename = "engagement_ratio.{}.csv".format(_datetime.strftime(_datetime.utcnow(), "%Y%m%d"))
s3path = "mreid/maudau"
engagement_key = "{}/{}".format(s3path, engagement_basename)
client = boto3.client('s3', 'us-west-2')
transfer = S3Transfer(client)
try:
transfer.download_file(data_bucket, engagement_key, engagement_basename)
except botocore.exceptions.ClientError as e:
# If the file wasn't there, that's ok. Otherwise, abort!
if e.response['Error']['Code'] != "404":
raise e
else:
print "Did not find an existing file at '{}'".format(engagement_key)
update_engagement_csv(dataset, engagement_basename, new_engagement_basename)
# ### Update data on S3
# Now we have an updated dataset on the local filesystem.
#
# Since it is so tiny, we keep a date-stamped backup of each dataset in addition to the "latest" file.
#
# Upload the updated file back to S3, as well as relaying it to the S3 bucket that automatically relays to the dashboard server. This final upload appears in the [Firefox Dashboard data dir](http://metrics.services.mozilla.com/firefox-dashboard/data/) as [engagement_ratio.csv](http://metrics.services.mozilla.com/firefox-dashboard/data/engagement_ratio.csv).
# In[6]:
## Upload the updated csv file to S3
# Update the day-specific file:
new_s3_name = "{}/{}".format(s3path, new_engagement_basename)
transfer.upload_file(new_engagement_basename, data_bucket, new_s3_name)
# Update the "main" file
transfer.upload_file(new_engagement_basename, data_bucket, engagement_key)
# Update the dashboard file
dash_bucket = "net-mozaws-prod-metrics-data"
dash_s3_name = "firefox-dashboard/{}".format(engagement_basename)
transfer.upload_file(new_engagement_basename, dash_bucket, dash_s3_name,
extra_args={'ACL': 'bucket-owner-full-control'})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment