-
-
Save acmiyaguchi/f5eef1c11a4a5f24616bf50aeb5a8d7e to your computer and use it in GitHub Desktop.
stub_attribution
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
# ## Stub Attribution Dataset | |
# This notebook generates a stub attribtion subset of the main_summary. This notebook is only for exploratory purposes, it will later be merged into the churn dataset. | |
# In[1]: | |
import logging | |
import pyspark.sql.functions as F | |
from datetime import datetime as dt, timedelta | |
from moztelemetry.standards import snap_to_beginning_of_week | |
# Date when attribution field was added to main_summary | |
# https://github.com/mozilla/telemetry-batch-view/commit/71ad6ab7b5efac4f5fd84f15c42f27285d9e8c53 | |
COLLECT_START_DATE = "20170118" | |
# Date when :stephend started collecting ping data | |
FIRST_TEST_PING_DATE = "20161222" | |
S3_ATTRIBUTION_BUCKET = "net-mozaws-prod-us-west-2-pipeline-analysis" | |
S3_ATTRIBUTION_PREFIX = "amiyaguchi/stub_subset/v1" | |
STUB_ATTRIBUTION_FIELDS = [ | |
"client_id", | |
"timestamp", | |
"submission_date_s3", | |
"subsession_start_date", | |
"profile_creation_date", | |
"source", | |
"medium", | |
"campaign", | |
"content", | |
] | |
def attribution_for_day(main_summary_df, ds_yyyymmdd): | |
"""Select the fields for stub attribution from main_summary.""" | |
df = (main_summary_df | |
.where(F.col('submission_date_s3') == ds_yyyymmdd) | |
.where(F.col('attribution').isNotNull()) | |
.select( | |
F.col('client_id'), | |
F.col('timestamp'), | |
F.col('submission_date_s3'), | |
F.col('subsession_start_date'), | |
F.col('profile_creation_date'), | |
F.col('attribution.source').alias('source'), | |
F.col('attribution.medium').alias('medium'), | |
F.col('attribution.campaign').alias('campaign'), | |
F.col('attribution.content').alias('content') | |
)) | |
return df | |
def attribution_for_day_raw(ping_rdd, ds_yyyymmdd): | |
return None | |
def write_attribution_s3(df, bucket, prefix, day, mode='day'): | |
"""Write a dataframe to an s3 location, keyed by submission_date_s3.""" | |
if df.rdd.isEmpty(): | |
logging.warning("Dataframe is empty for %s, skipping writing", day) | |
return | |
mode_dict = {'day': 'submission_date_s3', 'week': 'week_start'} | |
partition_key = mode_dict.get(mode) | |
if not partition_key: | |
raise KeyError("Invalid mode: %s", mode) | |
s3_path = "s3://{}/{}/{}={}".format(bucket, prefix, partition_key, day) | |
logging.info("Writing dataframe to %s", s3_path) | |
df.write.parquet(s3_path, mode="overwrite") | |
def daterange(start_date, end_date): | |
"""Generate a range of datestrings. | |
start_date: datestring in yyyymmdd | |
end_date: datestring yyyymmdd""" | |
start_day = dt.strptime(start_date, "%Y%m%d") | |
end_day = dt.strptime(end_date, "%Y%m%d") | |
for n in range(int((end_day - start_day).days) + 1): | |
yield (start_day + timedelta(n)).strftime("%Y%m%d") | |
def backfill_attribution(df, start_date, end_date): | |
""" Backfill the attribution data from a start date to an end date. | |
df: main summary dataframe | |
start_ds: start datestring in yyyymmdd format | |
end_ds: end datestring in yyyymmdd format | |
""" | |
for day in daterange(start_date, end_date): | |
attribution_df = None | |
if day >= COLLECT_START_DATE: | |
attribution_df = attribution_for_day(df, day) | |
elif day >= FIRST_TEST_PING_DATE: | |
# needs to read from the dataset api | |
ping_rdd = None | |
attribution_df = attribution_for_day_raw(ping_rdd, day) | |
logging.warn("Not Implemented: data from before 20170118") | |
continue | |
else: | |
logging.warn("%s - data is impractically sparse before 20161222", day) | |
continue | |
write_attribution_s3(attribution_df, | |
S3_ATTRIBUTION_BUCKET, | |
S3_ATTRIBUTION_PREFIX, | |
day) | |
def get_main_summary(): | |
"""Returns the main summary dataframe.""" | |
return ( | |
spark.read.option("mergeSchema", "true") | |
.parquet("s3://telemetry-parquet/main_summary/v3")) | |
def main(args): | |
"""Run the stub attribution dataset builder. | |
args: dictionary with flags for running this program | |
""" | |
yesterday = (dt.now() - timedelta(1)).strftime("%Y%m%d") | |
# --backfill --start_date=yyyymmdd --end_date=yyyymmdd | |
if args.get('backfill') == "true": | |
start = args.get('start_date') | |
end = args.get('end_date', yesterday) # optional | |
if not start: | |
logging.error("The backfill option requires a start_date") | |
return | |
logging.info("Running backfill for %s to %s", start, end) | |
backfill_attribution(get_main_summary(), start, end) | |
# --date=yyyymmdd | |
else: | |
target_date = args.get('date', yesterday) | |
logging.info("Running stub attribution for %s", target_date) | |
main_summary_df = get_main_summary() | |
attribution_df = attribution_for_day(main_summary_df, target_date) | |
write_attribution_s3(attribution_df, | |
S3_ATTRIBUTION_BUCKET, | |
S3_ATTRIBUTION_PREFIX, | |
target_date) | |
# In[2]: | |
import os | |
# Set up logging | |
reload(logging) | |
logging.basicConfig(level=logging.INFO, | |
format='%(asctime)s %(name)-4s %(levelname)-8s %(message)s', | |
datefmt='%Y-%m-%d %H:%M:%S') | |
# if the date environment variable exists, assume that this run is automated | |
if not os.environ.get('date'): | |
options = { | |
"backfill": "true", | |
"start_date": "20170118", # if backfill == "true" | |
"date": "20170124" | |
} | |
os.environ.update(options) | |
main(os.environ) | |
logging.info("Completed stub attribution job") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment