Skip to content

Instantly share code, notes, and snippets.

Last active February 3, 2017 01:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save acmiyaguchi/d3cc2b2f9e8441a3c49a97ff09c12d53 to your computer and use it in GitHub Desktop.
Save acmiyaguchi/d3cc2b2f9e8441a3c49a97ff09c12d53 to your computer and use it in GitHub Desktop.
Stub - Test Data
Display the source blob
Display the rendered blob
"cells": [
"cell_type": "markdown",
"metadata": {},
"source": [
"## Generate Stub Attribution Dataset"
"cell_type": "code",
"execution_count": 18,
"metadata": {
"collapsed": false
"outputs": [],
"source": [
"# Portions of code are taken from [1].\n",
"# [1]\n",
"import random\n",
"import datetime\n",
"import logging\n",
"from pyspark.sql import Row, Window\n",
"import pyspark.sql.functions as F\n",
"from pyspark.sql.types import *\n",
"from collections import OrderedDict\n",
"from moztelemetry.standards import snap_to_beginning_of_week\n",
" \"client_id\",\n",
" \"timestamp\",\n",
" \"submission_date_s3\",\n",
" \"subsession_start_date\",\n",
" \"profile_creation_date\",\n",
" \"source\",\n",
" \"medium\",\n",
" \"campaign\",\n",
" \"content\",\n",
"def generate_test_df(start_date, n_clients, n_weeks):\n",
" # this data should be deterministic\n",
" random.seed(42)\n",
" \n",
" # These labels are applied to the clients using a triangular distribution. This\n",
" # should suffice for demonstrating the different campaigns. A future improvement\n",
" # is to use a gaussian distribution, which would be better at\n",
" # simulating a long tail of rare permutation of dimensions. \n",
" #\n",
" # Labels are chosen to be somewhat representative of the production labels. However,\n",
" # if labels are to be chosen to be more broad, it might be useful to generate the \n",
" # label category and append a numerical id like the clients.\n",
" source_labels = [\"google\", \"homepage\", \"yahoo\", \"bing\"]\n",
" medium_labels = [\"referral\", \"organic\", \"cpc\"]\n",
" campaign_labels = [None, \"campaign_1\", \"campaign_2\"]\n",
" content_labels = [None, \"content_1\", \"content_2\", None, \"content_3\"]\n",
" \n",
" attrib_dict = {\n",
" \"source\": source_labels,\n",
" \"medium\": medium_labels, \n",
" \"campaign\": campaign_labels, \n",
" \"content\": content_labels\n",
" }\n",
" \n",
" # Generate our fake clients\n",
" clients = [{'client_id': 'client_{:02d}'.format(x)} for x in range(n_clients)]\n",
" \n",
" # assign the attributes to each client\n",
" for client in clients:\n",
" for attrib, labels in attrib_dict.iteritems():\n",
" idx = int(random.triangular(0, len(labels)))\n",
" client[attrib] = labels[idx]\n",
" \n",
" # Assign each client to a week start, distributed evenly through the week\n",
" # profile_creation_date: days since epoch\n",
" new_per_week = n_clients // n_weeks\n",
" days_per_week = 7\n",
" \n",
" loss_rate = 0.25\n",
" loss_delta = 0.1\n",
" \n",
" date_start = datetime.datetime.strptime(start_date, \"%Y%M%d\")\n",
" epoch = datetime.datetime.utcfromtimestamp(0)\n",
" date_offset = (date_start - epoch).days\n",
" data = []\n",
" \n",
" for week_start in range(n_weeks):\n",
" days_since_epoch = date_offset + (week_start * days_per_week)\n",
" id_offset = week_start * new_per_week\n",
" \n",
" # create a new cohort of users\n",
" for relative_id in range(new_per_week):\n",
" uid = id_offset + relative_id\n",
" clients[uid]['profile_creation_date'] = days_since_epoch\n",
" \n",
" # Provide client activity per week. A certain percentage of users will\n",
" # drop off every week, randomly dropped from all user. This should allow \n",
" # us to see if we can observe a general trend in user usage. Each user \n",
" # should also send a signficant number of duplicate requests, to test \n",
" # that the process is resiliant against overcounting.\n",
" # subsession_start_date == submission_date_s3 == timestamp (ns)\n",
" cohort_size = new_per_week\n",
" for future_week in range(week_start, n_weeks):\n",
" # cohort_size models user drop-off\n",
" future_days = date_offset + (future_week * days_per_week)\n",
" for relative_id in range(cohort_size):\n",
" # randomly ping 1-3 times for duplicates\n",
" for _ in range(random.randint(1, 3)):\n",
" uid = id_offset + relative_id\n",
" arrival = (\n",
" epoch + \n",
" datetime.timedelta(\n",
" future_days + random.randint(0, days_per_week - 1))\n",
" )\n",
" ping = clients[uid].copy()\n",
" submission_dict = {\n",
" \"subsession_start_date\": arrival.strftime(\"%Y-%m-%d\"),\n",
" \"submission_date_s3\": arrival.strftime(\"%Y%m%d\"),\n",
" \"timestamp\": (arrival-epoch).total_seconds() * (10 ** 9) # nanoseconds\n",
" }\n",
" ping.update(submission_dict)\n",
" data.append(ping)\n",
" # loss rate increases with cohort start date\n",
" cohort_size -= int(cohort_size * (loss_rate + (loss_delta * week_start)))\n",
" \n",
" return (\n",
" sc.parallelize(data)\n",
" .map(lambda d: Row(**d))\n",
" .toDF()\n",
" )"
"cell_type": "code",
"execution_count": 57,
"metadata": {
"collapsed": false
"outputs": [],
"source": [
"def get_newest_per_client(df):\n",
" window_spec = Window.partitionBy(df['client_id']).orderBy(df['timestamp'].desc())\n",
" rownum_by_timestamp = F.row_number().over(window_spec)\n",
" selectable_by_client =\n",
" rownum_by_timestamp.alias('row_number'),\n",
" *df.columns\n",
" )\n",
" return (selectable_by_client\n",
" .filter(selectable_by_client['row_number'] == 1)\n",
"def fmt(d, date_format=\"%Y%m%d\"):\n",
" return datetime.datetime.strftime(d, date_format)\n",
"def get_week_num(creation, today):\n",
" if creation is None or today is None:\n",
" return None\n",
" diff = ( - creation).days\n",
" if diff < 0:\n",
" # Creation date is in the future. Bad data :(\n",
" return -1\n",
" # The initial week is week zero.\n",
" return int(diff / 7)\n",
"def daynum_to_date(daynum):\n",
" \"\"\" Convert a number of days to a date. If it's out of range, default to a max date.\n",
" :param daynum: A number of days since Jan 1, 1970\n",
" \"\"\"\n",
" if daynum is None:\n",
" return None\n",
" if daynum < 0:\n",
" return None\n",
" daycount = int(daynum)\n",
" if daycount > 1000000:\n",
" # Some time in the 48th century, clearly bogus.\n",
" daycount = 1000000\n",
" return, 1, 1) + datetime.timedelta(daycount)\n",
"def get_current_week(profile_creation_date, subsession_start_date, submission_date_s3):\n",
" pcd = daynum_to_date(profile_creation_date)\n",
" client_date = None\n",
" if subsession_start_date is not None:\n",
" try:\n",
" client_date = (datetime\n",
" .datetime\n",
" .strptime(subsession_start_date[0:10], \"%Y-%m-%d\"))\n",
" except ValueError as e1:\n",
" # Bogus format\n",
" return 'unknown'\n",
" except TypeError as e2:\n",
" # String contains null bytes or other weirdness. Example:\n",
" # TypeError: must be string without null bytes, not unicode\n",
" return 'unknown'\n",
" if client_date is None:\n",
" # Fall back to submission date\n",
" client_date = datetime.datetime.strptime(submission_date_s3, \"%Y%m%d\")\n",
" return get_week_num(pcd, client_date)\n",
"current_week_udf = F.udf(get_current_week, StringType())\n",
"# Note: defined in a functional way, but perhaps not the most pythonic (or readable)\n",
"pcd_to_aquisition_udf = (\n",
" F.udf(lambda pcd: (\n",
" datetime.datetime.strftime(\n",
" snap_to_beginning_of_week(\n",
" daynum_to_date(pcd), \n",
" \"Sunday\"),\n",
" \"%Y-%m-%d\")),\n",
" StringType()))\n",
"def compute_week(df, week_start):\n",
" week_start_date = datetime.datetime.strptime(week_start, \"%Y%m%d\")\n",
" week_end_date = week_start_date + datetime.timedelta(6)\n",
" week_start = fmt(week_start_date)\n",
" week_end = fmt(week_end_date)\n",
" \n",
" # Verify that the start date is a Sunday\n",
" if week_start_date.weekday() != 6:\n",
" print(\"Week start date {} is not a Sunday\".format(week_start))\n",
" return\n",
" \n",
" print(\"Starting week from {} to {} at {}\"\n",
" .format(week_start, week_end, datetime.datetime.utcnow()))\n",
" # the subsession_start_date field has a different form than submission_date_s3,\n",
" # so needs to be formatted with hyphens.\n",
" week_end_slop = fmt(week_end_date + datetime.timedelta(10))\n",
" week_end_excl = fmt(week_end_date + datetime.timedelta(1), date_format=\"%Y-%m-%d\")\n",
" week_start_hyphenated = fmt(week_start_date, date_format=\"%Y-%m-%d\")\n",
" current_week = (\n",
" df.filter(df['submission_date_s3'] >= week_start)\n",
" .filter(df['submission_date_s3'] <= week_end_slop)\n",
" .filter(df['subsession_start_date'] >= week_start_hyphenated)\n",
" .filter(df['subsession_start_date'] < week_end_excl)\n",
" )\n",
" newest_per_client = (\n",
" get_newest_per_client(current_week)\n",
" .select(\n",
" F.col('*'),\n",
" pcd_to_aquisition_udf(\n",
" F.col('profile_creation_date')\n",
" ).alias('aquisition_period'),\n",
" current_week_udf(\n",
" F.col('profile_creation_date'), \n",
" F.col('subsession_start_date'),\n",
" F.col('submission_date_s3')\n",
" ).alias('current_week')))\n",
" \n",
" return newest_per_client "
"cell_type": "code",
"execution_count": 59,
"metadata": {
"collapsed": false
"outputs": [
"name": "stdout",
"output_type": "stream",
"text": [
"Starting week from 20160103 to 20160109 at 2017-02-02 23:02:23.750227\n",
" |-- client_id: string (nullable = true)\n",
" |-- timestamp: double (nullable = true)\n",
" |-- submission_date_s3: string (nullable = true)\n",
" |-- subsession_start_date: string (nullable = true)\n",
" |-- profile_creation_date: long (nullable = true)\n",
" |-- source: string (nullable = true)\n",
" |-- medium: string (nullable = true)\n",
" |-- campaign: string (nullable = true)\n",
" |-- content: string (nullable = true)\n",
" |-- aquisition_period: string (nullable = true)\n",
" |-- current_week: string (nullable = true)\n",
"Starting week from 20160110 to 20160116 at 2017-02-02 23:03:25.074850\n",
"Starting week from 20160117 to 20160123 at 2017-02-02 23:04:29.309601\n",
"Starting week from 20160124 to 20160130 at 2017-02-02 23:05:29.744713\n",
"Starting week from 20160131 to 20160206 at 2017-02-02 23:06:28.580993\n"
"source": [
"S3_ATTRIBUTION_BUCKET = 'net-mozaws-prod-us-west-2-pipeline-analysis'\n",
"S3_ATTRIBUTION_PREFIX = 'amiyaguchi/stub/v1'\n",
"n_weeks = 5\n",
"df = generate_test_df(100, n_weeks)\n",
"start_date = datetime.datetime.strptime(\"20160103\", \"%Y%m%d\")\n",
"for week in range(n_weeks):\n",
" delta = datetime.timedelta(7) * week\n",
" week_start = fmt(start_date + delta)\n",
" week_df = compute_week(df, week_start)\n",
" if week == 0:\n",
" week_df.printSchema()\n",
" s3_path = \"s3://{}/{}/{}={}\".format(S3_ATTRIBUTION_BUCKET, \n",
" 'week_start', week_start)\n",
"\"Writing dataframe to %s\", s3_path)\n",
" week_df.write.parquet(s3_path, mode=\"overwrite\")"
"metadata": {
"anaconda-cloud": {},
"kernelspec": {
"display_name": "Python [conda root]",
"language": "python",
"name": "conda-root-py"
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.12"
"nbformat": 4,
"nbformat_minor": 1
# coding: utf-8
# ## Generate Stub Attribution Dataset
# In[18]:
# Portions of code are taken from [1].
# [1]
import random
import datetime
import logging
from pyspark.sql import Row, Window
import pyspark.sql.functions as F
from pyspark.sql.types import *
from collections import OrderedDict
from moztelemetry.standards import snap_to_beginning_of_week
def generate_test_df(start_date, n_clients, n_weeks):
# this data should be deterministic
# These labels are applied to the clients using a triangular distribution. This
# should suffice for demonstrating the different campaigns. A future improvement
# is to use a gaussian distribution, which would be better at
# simulating a long tail of rare permutation of dimensions.
# Labels are chosen to be somewhat representative of the production labels. However,
# if labels are to be chosen to be more broad, it might be useful to generate the
# label category and append a numerical id like the clients.
source_labels = ["google", "homepage", "yahoo", "bing"]
medium_labels = ["referral", "organic", "cpc"]
campaign_labels = [None, "campaign_1", "campaign_2"]
content_labels = [None, "content_1", "content_2", None, "content_3"]
attrib_dict = {
"source": source_labels,
"medium": medium_labels,
"campaign": campaign_labels,
"content": content_labels
# Generate our fake clients
clients = [{'client_id': 'client_{:02d}'.format(x)} for x in range(n_clients)]
# assign the attributes to each client
for client in clients:
for attrib, labels in attrib_dict.iteritems():
idx = int(random.triangular(0, len(labels)))
client[attrib] = labels[idx]
# Assign each client to a week start, distributed evenly through the week
# profile_creation_date: days since epoch
new_per_week = n_clients // n_weeks
days_per_week = 7
loss_rate = 0.25
loss_delta = 0.1
date_start = datetime.datetime.strptime(start_date, "%Y%M%d")
epoch = datetime.datetime.utcfromtimestamp(0)
date_offset = (date_start - epoch).days
data = []
for week_start in range(n_weeks):
days_since_epoch = date_offset + (week_start * days_per_week)
id_offset = week_start * new_per_week
# create a new cohort of users
for relative_id in range(new_per_week):
uid = id_offset + relative_id
clients[uid]['profile_creation_date'] = days_since_epoch
# Provide client activity per week. A certain percentage of users will
# drop off every week, randomly dropped from all user. This should allow
# us to see if we can observe a general trend in user usage. Each user
# should also send a signficant number of duplicate requests, to test
# that the process is resiliant against overcounting.
# subsession_start_date == submission_date_s3 == timestamp (ns)
cohort_size = new_per_week
for future_week in range(week_start, n_weeks):
# cohort_size models user drop-off
future_days = date_offset + (future_week * days_per_week)
for relative_id in range(cohort_size):
# randomly ping 1-3 times for duplicates
for _ in range(random.randint(1, 3)):
uid = id_offset + relative_id
arrival = (
epoch +
future_days + random.randint(0, days_per_week - 1))
ping = clients[uid].copy()
submission_dict = {
"subsession_start_date": arrival.strftime("%Y-%m-%d"),
"submission_date_s3": arrival.strftime("%Y%m%d"),
"timestamp": (arrival-epoch).total_seconds() * (10 ** 9) # nanoseconds
# loss rate increases with cohort start date
cohort_size -= int(cohort_size * (loss_rate + (loss_delta * week_start)))
return (
.map(lambda d: Row(**d))
# In[57]:
def get_newest_per_client(df):
window_spec = Window.partitionBy(df['client_id']).orderBy(df['timestamp'].desc())
rownum_by_timestamp = F.row_number().over(window_spec)
selectable_by_client =
return (selectable_by_client
.filter(selectable_by_client['row_number'] == 1)
def fmt(d, date_format="%Y%m%d"):
return datetime.datetime.strftime(d, date_format)
def get_week_num(creation, today):
if creation is None or today is None:
return None
diff = ( - creation).days
if diff < 0:
# Creation date is in the future. Bad data :(
return -1
# The initial week is week zero.
return int(diff / 7)
def daynum_to_date(daynum):
""" Convert a number of days to a date. If it's out of range, default to a max date.
:param daynum: A number of days since Jan 1, 1970
if daynum is None:
return None
if daynum < 0:
return None
daycount = int(daynum)
if daycount > 1000000:
# Some time in the 48th century, clearly bogus.
daycount = 1000000
return, 1, 1) + datetime.timedelta(daycount)
def get_current_week(profile_creation_date, subsession_start_date, submission_date_s3):
pcd = daynum_to_date(profile_creation_date)
client_date = None
if subsession_start_date is not None:
client_date = (datetime
.strptime(subsession_start_date[0:10], "%Y-%m-%d"))
except ValueError as e1:
# Bogus format
return 'unknown'
except TypeError as e2:
# String contains null bytes or other weirdness. Example:
# TypeError: must be string without null bytes, not unicode
return 'unknown'
if client_date is None:
# Fall back to submission date
client_date = datetime.datetime.strptime(submission_date_s3, "%Y%m%d")
return get_week_num(pcd, client_date)
current_week_udf = F.udf(get_current_week, StringType())
# Note: defined in a functional way, but perhaps not the most pythonic (or readable)
pcd_to_aquisition_udf = (
F.udf(lambda pcd: (
def compute_week(df, week_start):
week_start_date = datetime.datetime.strptime(week_start, "%Y%m%d")
week_end_date = week_start_date + datetime.timedelta(6)
week_start = fmt(week_start_date)
week_end = fmt(week_end_date)
# Verify that the start date is a Sunday
if week_start_date.weekday() != 6:
print("Week start date {} is not a Sunday".format(week_start))
print("Starting week from {} to {} at {}"
.format(week_start, week_end, datetime.datetime.utcnow()))
# the subsession_start_date field has a different form than submission_date_s3,
# so needs to be formatted with hyphens.
week_end_slop = fmt(week_end_date + datetime.timedelta(10))
week_end_excl = fmt(week_end_date + datetime.timedelta(1), date_format="%Y-%m-%d")
week_start_hyphenated = fmt(week_start_date, date_format="%Y-%m-%d")
current_week = (
df.filter(df['submission_date_s3'] >= week_start)
.filter(df['submission_date_s3'] <= week_end_slop)
.filter(df['subsession_start_date'] >= week_start_hyphenated)
.filter(df['subsession_start_date'] < week_end_excl)
newest_per_client = (
return newest_per_client
# In[59]:
S3_ATTRIBUTION_BUCKET = 'net-mozaws-prod-us-west-2-pipeline-analysis'
S3_ATTRIBUTION_PREFIX = 'amiyaguchi/stub/v1'
n_weeks = 5
df = generate_test_df(100, n_weeks)
start_date = datetime.datetime.strptime("20160103", "%Y%m%d")
for week in range(n_weeks):
delta = datetime.timedelta(7) * week
week_start = fmt(start_date + delta)
week_df = compute_week(df, week_start)
if week == 0:
s3_path = "s3://{}/{}/{}={}".format(S3_ATTRIBUTION_BUCKET,
'week_start', week_start)"Writing dataframe to %s", s3_path)
week_df.write.parquet(s3_path, mode="overwrite")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment