Skip to content

Instantly share code, notes, and snippets.

@acmiyaguchi
Last active February 3, 2017 01:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save acmiyaguchi/d3cc2b2f9e8441a3c49a97ff09c12d53 to your computer and use it in GitHub Desktop.
Save acmiyaguchi/d3cc2b2f9e8441a3c49a97ff09c12d53 to your computer and use it in GitHub Desktop.
Stub - Test Data
Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
# coding: utf-8
# ## Generate Stub Attribution Dataset
# In[18]:
# Portions of code are taken from [1].
# [1] https://github.com/mozilla/mozilla-reports/blob/master/etl/churn.kp/orig_src/Churn.ipynb
import random
import datetime
import logging
from pyspark.sql import Row, Window
import pyspark.sql.functions as F
from pyspark.sql.types import *
from collections import OrderedDict
from moztelemetry.standards import snap_to_beginning_of_week
STUB_ATTRIBUTION_FIELDS = [
"client_id",
"timestamp",
"submission_date_s3",
"subsession_start_date",
"profile_creation_date",
"source",
"medium",
"campaign",
"content",
]
def generate_test_df(start_date, n_clients, n_weeks):
# this data should be deterministic
random.seed(42)
# These labels are applied to the clients using a triangular distribution. This
# should suffice for demonstrating the different campaigns. A future improvement
# is to use a gaussian distribution, which would be better at
# simulating a long tail of rare permutation of dimensions.
#
# Labels are chosen to be somewhat representative of the production labels. However,
# if labels are to be chosen to be more broad, it might be useful to generate the
# label category and append a numerical id like the clients.
source_labels = ["google", "homepage", "yahoo", "bing"]
medium_labels = ["referral", "organic", "cpc"]
campaign_labels = [None, "campaign_1", "campaign_2"]
content_labels = [None, "content_1", "content_2", None, "content_3"]
attrib_dict = {
"source": source_labels,
"medium": medium_labels,
"campaign": campaign_labels,
"content": content_labels
}
# Generate our fake clients
clients = [{'client_id': 'client_{:02d}'.format(x)} for x in range(n_clients)]
# assign the attributes to each client
for client in clients:
for attrib, labels in attrib_dict.iteritems():
idx = int(random.triangular(0, len(labels)))
client[attrib] = labels[idx]
# Assign each client to a week start, distributed evenly through the week
# profile_creation_date: days since epoch
new_per_week = n_clients // n_weeks
days_per_week = 7
loss_rate = 0.25
loss_delta = 0.1
date_start = datetime.datetime.strptime(start_date, "%Y%M%d")
epoch = datetime.datetime.utcfromtimestamp(0)
date_offset = (date_start - epoch).days
data = []
for week_start in range(n_weeks):
days_since_epoch = date_offset + (week_start * days_per_week)
id_offset = week_start * new_per_week
# create a new cohort of users
for relative_id in range(new_per_week):
uid = id_offset + relative_id
clients[uid]['profile_creation_date'] = days_since_epoch
# Provide client activity per week. A certain percentage of users will
# drop off every week, randomly dropped from all user. This should allow
# us to see if we can observe a general trend in user usage. Each user
# should also send a signficant number of duplicate requests, to test
# that the process is resiliant against overcounting.
# subsession_start_date == submission_date_s3 == timestamp (ns)
cohort_size = new_per_week
for future_week in range(week_start, n_weeks):
# cohort_size models user drop-off
future_days = date_offset + (future_week * days_per_week)
for relative_id in range(cohort_size):
# randomly ping 1-3 times for duplicates
for _ in range(random.randint(1, 3)):
uid = id_offset + relative_id
arrival = (
epoch +
datetime.timedelta(
future_days + random.randint(0, days_per_week - 1))
)
ping = clients[uid].copy()
submission_dict = {
"subsession_start_date": arrival.strftime("%Y-%m-%d"),
"submission_date_s3": arrival.strftime("%Y%m%d"),
"timestamp": (arrival-epoch).total_seconds() * (10 ** 9) # nanoseconds
}
ping.update(submission_dict)
data.append(ping)
# loss rate increases with cohort start date
cohort_size -= int(cohort_size * (loss_rate + (loss_delta * week_start)))
return (
sc.parallelize(data)
.map(lambda d: Row(**d))
.toDF()
.select(STUB_ATTRIBUTION_FIELDS)
)
# In[57]:
def get_newest_per_client(df):
window_spec = Window.partitionBy(df['client_id']).orderBy(df['timestamp'].desc())
rownum_by_timestamp = F.row_number().over(window_spec)
selectable_by_client = df.select(
rownum_by_timestamp.alias('row_number'),
*df.columns
)
return (selectable_by_client
.filter(selectable_by_client['row_number'] == 1)
.select(STUB_ATTRIBUTION_FIELDS))
def fmt(d, date_format="%Y%m%d"):
return datetime.datetime.strftime(d, date_format)
def get_week_num(creation, today):
if creation is None or today is None:
return None
diff = (today.date() - creation).days
if diff < 0:
# Creation date is in the future. Bad data :(
return -1
# The initial week is week zero.
return int(diff / 7)
def daynum_to_date(daynum):
""" Convert a number of days to a date. If it's out of range, default to a max date.
:param daynum: A number of days since Jan 1, 1970
"""
if daynum is None:
return None
if daynum < 0:
return None
daycount = int(daynum)
if daycount > 1000000:
# Some time in the 48th century, clearly bogus.
daycount = 1000000
return datetime.date(1970, 1, 1) + datetime.timedelta(daycount)
def get_current_week(profile_creation_date, subsession_start_date, submission_date_s3):
pcd = daynum_to_date(profile_creation_date)
client_date = None
if subsession_start_date is not None:
try:
client_date = (datetime
.datetime
.strptime(subsession_start_date[0:10], "%Y-%m-%d"))
except ValueError as e1:
# Bogus format
return 'unknown'
except TypeError as e2:
# String contains null bytes or other weirdness. Example:
# TypeError: must be string without null bytes, not unicode
return 'unknown'
if client_date is None:
# Fall back to submission date
client_date = datetime.datetime.strptime(submission_date_s3, "%Y%m%d")
return get_week_num(pcd, client_date)
current_week_udf = F.udf(get_current_week, StringType())
# Note: defined in a functional way, but perhaps not the most pythonic (or readable)
pcd_to_aquisition_udf = (
F.udf(lambda pcd: (
datetime.datetime.strftime(
snap_to_beginning_of_week(
daynum_to_date(pcd),
"Sunday"),
"%Y-%m-%d")),
StringType()))
def compute_week(df, week_start):
week_start_date = datetime.datetime.strptime(week_start, "%Y%m%d")
week_end_date = week_start_date + datetime.timedelta(6)
week_start = fmt(week_start_date)
week_end = fmt(week_end_date)
# Verify that the start date is a Sunday
if week_start_date.weekday() != 6:
print("Week start date {} is not a Sunday".format(week_start))
return
print("Starting week from {} to {} at {}"
.format(week_start, week_end, datetime.datetime.utcnow()))
# the subsession_start_date field has a different form than submission_date_s3,
# so needs to be formatted with hyphens.
week_end_slop = fmt(week_end_date + datetime.timedelta(10))
week_end_excl = fmt(week_end_date + datetime.timedelta(1), date_format="%Y-%m-%d")
week_start_hyphenated = fmt(week_start_date, date_format="%Y-%m-%d")
current_week = (
df.filter(df['submission_date_s3'] >= week_start)
.filter(df['submission_date_s3'] <= week_end_slop)
.filter(df['subsession_start_date'] >= week_start_hyphenated)
.filter(df['subsession_start_date'] < week_end_excl)
)
newest_per_client = (
get_newest_per_client(current_week)
.select(
F.col('*'),
pcd_to_aquisition_udf(
F.col('profile_creation_date')
).alias('aquisition_period'),
current_week_udf(
F.col('profile_creation_date'),
F.col('subsession_start_date'),
F.col('submission_date_s3')
).alias('current_week')))
return newest_per_client
# In[59]:
S3_ATTRIBUTION_BUCKET = 'net-mozaws-prod-us-west-2-pipeline-analysis'
S3_ATTRIBUTION_PREFIX = 'amiyaguchi/stub/v1'
n_weeks = 5
df = generate_test_df(100, n_weeks)
start_date = datetime.datetime.strptime("20160103", "%Y%m%d")
for week in range(n_weeks):
delta = datetime.timedelta(7) * week
week_start = fmt(start_date + delta)
week_df = compute_week(df, week_start)
if week == 0:
week_df.printSchema()
s3_path = "s3://{}/{}/{}={}".format(S3_ATTRIBUTION_BUCKET,
S3_ATTRIBUTION_PREFIX,
'week_start', week_start)
logging.info("Writing dataframe to %s", s3_path)
week_df.write.parquet(s3_path, mode="overwrite")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment