-
-
Save acmiyaguchi/d3cc2b2f9e8441a3c49a97ff09c12d53 to your computer and use it in GitHub Desktop.
Stub - Test Data
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
# ## Generate Stub Attribution Dataset | |
# In[18]: | |
# Portions of code are taken from [1]. | |
# [1] https://github.com/mozilla/mozilla-reports/blob/master/etl/churn.kp/orig_src/Churn.ipynb | |
import random | |
import datetime | |
import logging | |
from pyspark.sql import Row, Window | |
import pyspark.sql.functions as F | |
from pyspark.sql.types import * | |
from collections import OrderedDict | |
from moztelemetry.standards import snap_to_beginning_of_week | |
STUB_ATTRIBUTION_FIELDS = [ | |
"client_id", | |
"timestamp", | |
"submission_date_s3", | |
"subsession_start_date", | |
"profile_creation_date", | |
"source", | |
"medium", | |
"campaign", | |
"content", | |
] | |
def generate_test_df(start_date, n_clients, n_weeks): | |
# this data should be deterministic | |
random.seed(42) | |
# These labels are applied to the clients using a triangular distribution. This | |
# should suffice for demonstrating the different campaigns. A future improvement | |
# is to use a gaussian distribution, which would be better at | |
# simulating a long tail of rare permutation of dimensions. | |
# | |
# Labels are chosen to be somewhat representative of the production labels. However, | |
# if labels are to be chosen to be more broad, it might be useful to generate the | |
# label category and append a numerical id like the clients. | |
source_labels = ["google", "homepage", "yahoo", "bing"] | |
medium_labels = ["referral", "organic", "cpc"] | |
campaign_labels = [None, "campaign_1", "campaign_2"] | |
content_labels = [None, "content_1", "content_2", None, "content_3"] | |
attrib_dict = { | |
"source": source_labels, | |
"medium": medium_labels, | |
"campaign": campaign_labels, | |
"content": content_labels | |
} | |
# Generate our fake clients | |
clients = [{'client_id': 'client_{:02d}'.format(x)} for x in range(n_clients)] | |
# assign the attributes to each client | |
for client in clients: | |
for attrib, labels in attrib_dict.iteritems(): | |
idx = int(random.triangular(0, len(labels))) | |
client[attrib] = labels[idx] | |
# Assign each client to a week start, distributed evenly through the week | |
# profile_creation_date: days since epoch | |
new_per_week = n_clients // n_weeks | |
days_per_week = 7 | |
loss_rate = 0.25 | |
loss_delta = 0.1 | |
date_start = datetime.datetime.strptime(start_date, "%Y%M%d") | |
epoch = datetime.datetime.utcfromtimestamp(0) | |
date_offset = (date_start - epoch).days | |
data = [] | |
for week_start in range(n_weeks): | |
days_since_epoch = date_offset + (week_start * days_per_week) | |
id_offset = week_start * new_per_week | |
# create a new cohort of users | |
for relative_id in range(new_per_week): | |
uid = id_offset + relative_id | |
clients[uid]['profile_creation_date'] = days_since_epoch | |
# Provide client activity per week. A certain percentage of users will | |
# drop off every week, randomly dropped from all user. This should allow | |
# us to see if we can observe a general trend in user usage. Each user | |
# should also send a signficant number of duplicate requests, to test | |
# that the process is resiliant against overcounting. | |
# subsession_start_date == submission_date_s3 == timestamp (ns) | |
cohort_size = new_per_week | |
for future_week in range(week_start, n_weeks): | |
# cohort_size models user drop-off | |
future_days = date_offset + (future_week * days_per_week) | |
for relative_id in range(cohort_size): | |
# randomly ping 1-3 times for duplicates | |
for _ in range(random.randint(1, 3)): | |
uid = id_offset + relative_id | |
arrival = ( | |
epoch + | |
datetime.timedelta( | |
future_days + random.randint(0, days_per_week - 1)) | |
) | |
ping = clients[uid].copy() | |
submission_dict = { | |
"subsession_start_date": arrival.strftime("%Y-%m-%d"), | |
"submission_date_s3": arrival.strftime("%Y%m%d"), | |
"timestamp": (arrival-epoch).total_seconds() * (10 ** 9) # nanoseconds | |
} | |
ping.update(submission_dict) | |
data.append(ping) | |
# loss rate increases with cohort start date | |
cohort_size -= int(cohort_size * (loss_rate + (loss_delta * week_start))) | |
return ( | |
sc.parallelize(data) | |
.map(lambda d: Row(**d)) | |
.toDF() | |
.select(STUB_ATTRIBUTION_FIELDS) | |
) | |
# In[57]: | |
def get_newest_per_client(df): | |
window_spec = Window.partitionBy(df['client_id']).orderBy(df['timestamp'].desc()) | |
rownum_by_timestamp = F.row_number().over(window_spec) | |
selectable_by_client = df.select( | |
rownum_by_timestamp.alias('row_number'), | |
*df.columns | |
) | |
return (selectable_by_client | |
.filter(selectable_by_client['row_number'] == 1) | |
.select(STUB_ATTRIBUTION_FIELDS)) | |
def fmt(d, date_format="%Y%m%d"): | |
return datetime.datetime.strftime(d, date_format) | |
def get_week_num(creation, today): | |
if creation is None or today is None: | |
return None | |
diff = (today.date() - creation).days | |
if diff < 0: | |
# Creation date is in the future. Bad data :( | |
return -1 | |
# The initial week is week zero. | |
return int(diff / 7) | |
def daynum_to_date(daynum): | |
""" Convert a number of days to a date. If it's out of range, default to a max date. | |
:param daynum: A number of days since Jan 1, 1970 | |
""" | |
if daynum is None: | |
return None | |
if daynum < 0: | |
return None | |
daycount = int(daynum) | |
if daycount > 1000000: | |
# Some time in the 48th century, clearly bogus. | |
daycount = 1000000 | |
return datetime.date(1970, 1, 1) + datetime.timedelta(daycount) | |
def get_current_week(profile_creation_date, subsession_start_date, submission_date_s3): | |
pcd = daynum_to_date(profile_creation_date) | |
client_date = None | |
if subsession_start_date is not None: | |
try: | |
client_date = (datetime | |
.datetime | |
.strptime(subsession_start_date[0:10], "%Y-%m-%d")) | |
except ValueError as e1: | |
# Bogus format | |
return 'unknown' | |
except TypeError as e2: | |
# String contains null bytes or other weirdness. Example: | |
# TypeError: must be string without null bytes, not unicode | |
return 'unknown' | |
if client_date is None: | |
# Fall back to submission date | |
client_date = datetime.datetime.strptime(submission_date_s3, "%Y%m%d") | |
return get_week_num(pcd, client_date) | |
current_week_udf = F.udf(get_current_week, StringType()) | |
# Note: defined in a functional way, but perhaps not the most pythonic (or readable) | |
pcd_to_aquisition_udf = ( | |
F.udf(lambda pcd: ( | |
datetime.datetime.strftime( | |
snap_to_beginning_of_week( | |
daynum_to_date(pcd), | |
"Sunday"), | |
"%Y-%m-%d")), | |
StringType())) | |
def compute_week(df, week_start): | |
week_start_date = datetime.datetime.strptime(week_start, "%Y%m%d") | |
week_end_date = week_start_date + datetime.timedelta(6) | |
week_start = fmt(week_start_date) | |
week_end = fmt(week_end_date) | |
# Verify that the start date is a Sunday | |
if week_start_date.weekday() != 6: | |
print("Week start date {} is not a Sunday".format(week_start)) | |
return | |
print("Starting week from {} to {} at {}" | |
.format(week_start, week_end, datetime.datetime.utcnow())) | |
# the subsession_start_date field has a different form than submission_date_s3, | |
# so needs to be formatted with hyphens. | |
week_end_slop = fmt(week_end_date + datetime.timedelta(10)) | |
week_end_excl = fmt(week_end_date + datetime.timedelta(1), date_format="%Y-%m-%d") | |
week_start_hyphenated = fmt(week_start_date, date_format="%Y-%m-%d") | |
current_week = ( | |
df.filter(df['submission_date_s3'] >= week_start) | |
.filter(df['submission_date_s3'] <= week_end_slop) | |
.filter(df['subsession_start_date'] >= week_start_hyphenated) | |
.filter(df['subsession_start_date'] < week_end_excl) | |
) | |
newest_per_client = ( | |
get_newest_per_client(current_week) | |
.select( | |
F.col('*'), | |
pcd_to_aquisition_udf( | |
F.col('profile_creation_date') | |
).alias('aquisition_period'), | |
current_week_udf( | |
F.col('profile_creation_date'), | |
F.col('subsession_start_date'), | |
F.col('submission_date_s3') | |
).alias('current_week'))) | |
return newest_per_client | |
# In[59]: | |
S3_ATTRIBUTION_BUCKET = 'net-mozaws-prod-us-west-2-pipeline-analysis' | |
S3_ATTRIBUTION_PREFIX = 'amiyaguchi/stub/v1' | |
n_weeks = 5 | |
df = generate_test_df(100, n_weeks) | |
start_date = datetime.datetime.strptime("20160103", "%Y%m%d") | |
for week in range(n_weeks): | |
delta = datetime.timedelta(7) * week | |
week_start = fmt(start_date + delta) | |
week_df = compute_week(df, week_start) | |
if week == 0: | |
week_df.printSchema() | |
s3_path = "s3://{}/{}/{}={}".format(S3_ATTRIBUTION_BUCKET, | |
S3_ATTRIBUTION_PREFIX, | |
'week_start', week_start) | |
logging.info("Writing dataframe to %s", s3_path) | |
week_df.write.parquet(s3_path, mode="overwrite") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment