Skip to content

Instantly share code, notes, and snippets.

@acmiyaguchi
Last active September 18, 2017 20:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save acmiyaguchi/8fb19ea223e2e400439fa74ae1906afb to your computer and use it in GitHub Desktop.
Save acmiyaguchi/8fb19ea223e2e400439fa74ae1906afb to your computer and use it in GitHub Desktop.
1 Day Retention v2
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This notebook does cleaning in preparation for the generic client count script. This uses a subset of the cleaning procedures that have been developed for churn dataset in `mozetl`. Unlike the previous version, this does not attempt to limit client counts to that of the `new_profile` ping. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from pyspark.sql import functions as F\n",
"\n",
"top_countries = {\n",
" \"US\", \"DE\", \"FR\", \"RU\", \"BR\", \"IN\", \"PL\", \"ID\", \"GB\", \"CN\",\n",
" \"IT\", \"JP\", \"CA\", \"ES\", \"UA\", \"MX\", \"AU\", \"VN\", \"EG\", \"AR\",\n",
" \"PH\", \"NL\", \"IR\", \"CZ\", \"HU\", \"TR\", \"RO\", \"GR\", \"AT\", \"CH\",\n",
" \"HK\", \"TW\", \"BE\", \"FI\", \"VE\", \"SE\", \"DZ\", \"MY\"\n",
"}\n",
"\n",
"def build_select_expr(mapping):\n",
" \"\"\"Build a select expression given a mapping from column to value.\n",
" A value of `None` will use the key as the column to map.\n",
" Returns a dictionary\n",
" \"\"\"\n",
" select_expr = {}\n",
"\n",
" for name, expr in mapping.iteritems():\n",
" column_expr = F.col(name) if expr is None else expr\n",
" select_expr[name] = column_expr.alias(name)\n",
"\n",
" return select_expr\n",
"\n",
"\n",
"def to_datetime(column, format='yyyyMMdd'):\n",
" # can use original column name for aliasing name\n",
" return (\n",
" F.from_unixtime(\n",
" F.unix_timestamp(column, format)))\n",
"\n",
"\n",
"def clean_columns(prepared_clients):\n",
" # Temporary column used for determining the validity of a row\n",
" is_valid = \"_is_valid\"\n",
"\n",
" pcd = F.from_unixtime(F.col(\"profile_creation_date\") * 24 * 60 * 60)\n",
" client_date = to_datetime('subsession_start_date', \"yyyy-MM-dd\")\n",
" days_since_creation = F.datediff(client_date, pcd)\n",
" is_funnelcake = F.col('distribution_id').rlike(\"^mozilla[0-9]+.*$\")\n",
"\n",
" attr_mapping = {\n",
" 'client_id': None,\n",
" 'distribution_id': None,\n",
" 'subsession_start': client_date,\n",
" 'channel': F.col((\"normalized_channel\")),\n",
" 'country': (\n",
" F.when(F.col(\"country\").isin(top_countries), F.col(\"country\"))\n",
" .otherwise(F.lit(\"ROW\"))),\n",
" # Bug 1289573: Support values like \"mozilla86\" and \"mozilla86-utility-existing\"\n",
" 'is_funnelcake': (\n",
" F.when(is_funnelcake, F.lit(\"yes\"))\n",
" .otherwise(F.lit(\"no\"))),\n",
" 'cohort_date': pcd, # TODO: overlaps with profile_creation\n",
" 'day_number': days_since_creation,\n",
" 'current_version': F.col(\"env_build_version\"),\n",
" }\n",
"\n",
" # Set the attributes to null if it's invalid\n",
" select_attr = [\n",
" F.when(F.col(is_valid), expr).otherwise(F.lit(None)).alias(attr)\n",
" for attr, expr in build_select_expr(attr_mapping).iteritems()\n",
" ]\n",
" select_expr = select_attr\n",
"\n",
" cleaned_data = (\n",
" # Compile per-client rows for the current retention period\n",
" prepared_clients\n",
" # Filter out seemingly impossible rows. One very obvious notion\n",
" # is to make sure that a profile is always created before a sub-session.\n",
" # Unlike `sane_date` in previous versions, this is idempotent and only\n",
" # depends on the data.\n",
" .withColumn(\"profile_creation\", F.date_format(pcd, 'yyyy-MM-dd'))\n",
" .withColumn(is_valid, (\n",
" F.col(\"profile_creation\").isNotNull() &\n",
" (F.col(\"profile_creation\") > \"2000-01-01\") &\n",
" (pcd <= client_date)\n",
" ))\n",
" .select(\"profile_creation\", *select_expr)\n",
" # Set default values for the rows\n",
" .fillna({\n",
" 'cohort_date': \"2000-01-01\",\n",
" 'is_funnelcake': \"no\",\n",
" \"day_number\": -1,\n",
" })\n",
" )\n",
" return cleaned_data"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"main_summary = (\n",
" spark.read\n",
" .option(\"mergeSchema\", \"true\")\n",
" .parquet(\"s3://telemetry-parquet/main_summary/v4\")\n",
" .where(\"submission_date_s3 > '20170301'\")\n",
" .where(\"sample_id='57'\")\n",
")\n",
"\n",
"subset = main_summary.select(\n",
" \"client_id\",\n",
" \"profile_creation_date\",\n",
" \"subsession_start_date\",\n",
" \"distribution_id\",\n",
" \"normalized_channel\",\n",
" \"country\",\n",
" \"env_build_version\"\n",
")\n",
"cleaned_clients = clean_columns(subset)\n",
"cleaned_clients.show()"
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": false
},
"source": [
"Copy into a new cell if you would like to persist the intermediate step.\n",
"```python\n",
"cleaned_clients_path = (\n",
" \"s3://net-mozaws-prod-us-west-2-pipeline-analysis/\"\n",
" \"amiyaguchi/retention_intermediate/main_summary/v1/\"\n",
")\n",
"cleaned_clients.write.parquet(cleaned_clients_path)\n",
"cleaned_clients = spark.read.parquet(cleaned_clients_path)\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"scrolled": true
},
"outputs": [],
"source": [
"new_profile = (\n",
" spark.read\n",
" .parquet(\"s3://net-mozaws-prod-us-west-2-pipeline-data/telemetry-new-profile-parquet/v1/\")\n",
")\n",
"\n",
"subset = (\n",
" new_profile\n",
" .selectExpr(\n",
" \"client_id\",\n",
" \"environment.profile.creation_date as profile_creation_date\",\n",
" \"from_unixtime(metadata.creation_timestamp/pow(10, 9)) as subsession_start_date\",\n",
" \"environment.partner.distribution_id\",\n",
" \"metadata.normalized_channel\",\n",
" \"metadata.geo_country as country\",\n",
" \"environment.build.version as env_build_version\",\n",
" # \"environment.settings.attribution\"\n",
" )\n",
" .where(\"crc32(encode(client_id, 'UTF-8')) % 100 = 57\")\n",
")\n",
"new_profile_cleaned = clean_columns(subset)\n",
"new_profile_cleaned.show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"processed = (\n",
" cleaned_clients\n",
" .union(new_profile_cleaned)\n",
" .where(\"client_id is not null\")\n",
")\n",
"\n",
"path = (\n",
" \"s3://net-mozaws-prod-us-west-2-pipeline-analysis/\"\n",
" \"amiyaguchi/retention_intermediate/cleaned/v2/\"\n",
")\n",
"processed.write.parquet(path, mode=\"overwrite\")"
]
}
],
"metadata": {
"anaconda-cloud": {},
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.13"
}
},
"nbformat": 4,
"nbformat_minor": 1
}
#!/bin/bash
if [[ -z "$bucket" ]]; then
echo "Missing arguments!" 1>&2
exit 1
fi
cd /tmp
git clone https://github.com/mozilla/telemetry-batch-view.git
cd telemetry-batch-view
sbt assembly
group=$(cat << END
subsession_start,cohort_date,day_number,channel,current_version,country,distribution_id,is_funnelcake
END
)
echo $group
spark-submit --master yarn \
--deploy-mode client \
--class com.mozilla.telemetry.views.GenericCountView \
target/scala-2.11/telemetry-batch-view-1.1.jar \
--files "s3://net-mozaws-prod-us-west-2-pipeline-analysis/amiyaguchi/retention_intermediate/cleaned/v2" \
--submission-date-col "submission_date" \
--count-column "client_id" \
--select "*, date_format(subsession_start, 'yyyyMMdd') as submission_date" \
--grouping-columns "$group" \
--where "client_id IS NOT NULL" \
--output "$bucket/retention_dev" \
--version "v2"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment