Skip to content

Instantly share code, notes, and snippets.

@acmiyaguchi
Last active February 3, 2017 21:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save acmiyaguchi/f5eef1c11a4a5f24616bf50aeb5a8d7e to your computer and use it in GitHub Desktop.
Save acmiyaguchi/f5eef1c11a4a5f24616bf50aeb5a8d7e to your computer and use it in GitHub Desktop.
stub_attribution
Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
# coding: utf-8
# ## Stub Attribution Dataset
# This notebook generates a stub attribtion subset of the main_summary. This notebook is only for exploratory purposes, it will later be merged into the churn dataset.
# In[1]:
import logging
import pyspark.sql.functions as F
from datetime import datetime as dt, timedelta
from moztelemetry.standards import snap_to_beginning_of_week
# Date when attribution field was added to main_summary
# https://github.com/mozilla/telemetry-batch-view/commit/71ad6ab7b5efac4f5fd84f15c42f27285d9e8c53
COLLECT_START_DATE = "20170118"
# Date when :stephend started collecting ping data
FIRST_TEST_PING_DATE = "20161222"
S3_ATTRIBUTION_BUCKET = "net-mozaws-prod-us-west-2-pipeline-analysis"
S3_ATTRIBUTION_PREFIX = "amiyaguchi/stub_subset/v1"
STUB_ATTRIBUTION_FIELDS = [
"client_id",
"timestamp",
"submission_date_s3",
"subsession_start_date",
"profile_creation_date",
"source",
"medium",
"campaign",
"content",
]
def attribution_for_day(main_summary_df, ds_yyyymmdd):
"""Select the fields for stub attribution from main_summary."""
df = (main_summary_df
.where(F.col('submission_date_s3') == ds_yyyymmdd)
.where(F.col('attribution').isNotNull())
.select(
F.col('client_id'),
F.col('timestamp'),
F.col('submission_date_s3'),
F.col('subsession_start_date'),
F.col('profile_creation_date'),
F.col('attribution.source').alias('source'),
F.col('attribution.medium').alias('medium'),
F.col('attribution.campaign').alias('campaign'),
F.col('attribution.content').alias('content')
))
return df
def attribution_for_day_raw(ping_rdd, ds_yyyymmdd):
return None
def write_attribution_s3(df, bucket, prefix, day, mode='day'):
"""Write a dataframe to an s3 location, keyed by submission_date_s3."""
if df.rdd.isEmpty():
logging.warning("Dataframe is empty for %s, skipping writing", day)
return
mode_dict = {'day': 'submission_date_s3', 'week': 'week_start'}
partition_key = mode_dict.get(mode)
if not partition_key:
raise KeyError("Invalid mode: %s", mode)
s3_path = "s3://{}/{}/{}={}".format(bucket, prefix, partition_key, day)
logging.info("Writing dataframe to %s", s3_path)
df.write.parquet(s3_path, mode="overwrite")
def daterange(start_date, end_date):
"""Generate a range of datestrings.
start_date: datestring in yyyymmdd
end_date: datestring yyyymmdd"""
start_day = dt.strptime(start_date, "%Y%m%d")
end_day = dt.strptime(end_date, "%Y%m%d")
for n in range(int((end_day - start_day).days) + 1):
yield (start_day + timedelta(n)).strftime("%Y%m%d")
def backfill_attribution(df, start_date, end_date):
""" Backfill the attribution data from a start date to an end date.
df: main summary dataframe
start_ds: start datestring in yyyymmdd format
end_ds: end datestring in yyyymmdd format
"""
for day in daterange(start_date, end_date):
attribution_df = None
if day >= COLLECT_START_DATE:
attribution_df = attribution_for_day(df, day)
elif day >= FIRST_TEST_PING_DATE:
# needs to read from the dataset api
ping_rdd = None
attribution_df = attribution_for_day_raw(ping_rdd, day)
logging.warn("Not Implemented: data from before 20170118")
continue
else:
logging.warn("%s - data is impractically sparse before 20161222", day)
continue
write_attribution_s3(attribution_df,
S3_ATTRIBUTION_BUCKET,
S3_ATTRIBUTION_PREFIX,
day)
def get_main_summary():
"""Returns the main summary dataframe."""
return (
spark.read.option("mergeSchema", "true")
.parquet("s3://telemetry-parquet/main_summary/v3"))
def main(args):
"""Run the stub attribution dataset builder.
args: dictionary with flags for running this program
"""
yesterday = (dt.now() - timedelta(1)).strftime("%Y%m%d")
# --backfill --start_date=yyyymmdd --end_date=yyyymmdd
if args.get('backfill') == "true":
start = args.get('start_date')
end = args.get('end_date', yesterday) # optional
if not start:
logging.error("The backfill option requires a start_date")
return
logging.info("Running backfill for %s to %s", start, end)
backfill_attribution(get_main_summary(), start, end)
# --date=yyyymmdd
else:
target_date = args.get('date', yesterday)
logging.info("Running stub attribution for %s", target_date)
main_summary_df = get_main_summary()
attribution_df = attribution_for_day(main_summary_df, target_date)
write_attribution_s3(attribution_df,
S3_ATTRIBUTION_BUCKET,
S3_ATTRIBUTION_PREFIX,
target_date)
# In[2]:
import os
# Set up logging
reload(logging)
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(name)-4s %(levelname)-8s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
# if the date environment variable exists, assume that this run is automated
if not os.environ.get('date'):
options = {
"backfill": "true",
"start_date": "20170118", # if backfill == "true"
"date": "20170124"
}
os.environ.update(options)
main(os.environ)
logging.info("Completed stub attribution job")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment