-
-
Save acmiyaguchi/8fb19ea223e2e400439fa74ae1906afb to your computer and use it in GitHub Desktop.
1 Day Retention v2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"This notebook does cleaning in preparation for the generic client count script. This uses a subset of the cleaning procedures that have been developed for churn dataset in `mozetl`. Unlike the previous version, this does not attempt to limit client counts to that of the `new_profile` ping. " | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"from pyspark.sql import functions as F\n", | |
"\n", | |
"top_countries = {\n", | |
" \"US\", \"DE\", \"FR\", \"RU\", \"BR\", \"IN\", \"PL\", \"ID\", \"GB\", \"CN\",\n", | |
" \"IT\", \"JP\", \"CA\", \"ES\", \"UA\", \"MX\", \"AU\", \"VN\", \"EG\", \"AR\",\n", | |
" \"PH\", \"NL\", \"IR\", \"CZ\", \"HU\", \"TR\", \"RO\", \"GR\", \"AT\", \"CH\",\n", | |
" \"HK\", \"TW\", \"BE\", \"FI\", \"VE\", \"SE\", \"DZ\", \"MY\"\n", | |
"}\n", | |
"\n", | |
"def build_select_expr(mapping):\n", | |
" \"\"\"Build a select expression given a mapping from column to value.\n", | |
" A value of `None` will use the key as the column to map.\n", | |
" Returns a dictionary\n", | |
" \"\"\"\n", | |
" select_expr = {}\n", | |
"\n", | |
" for name, expr in mapping.iteritems():\n", | |
" column_expr = F.col(name) if expr is None else expr\n", | |
" select_expr[name] = column_expr.alias(name)\n", | |
"\n", | |
" return select_expr\n", | |
"\n", | |
"\n", | |
"def to_datetime(column, format='yyyyMMdd'):\n", | |
" # can use original column name for aliasing name\n", | |
" return (\n", | |
" F.from_unixtime(\n", | |
" F.unix_timestamp(column, format)))\n", | |
"\n", | |
"\n", | |
"def clean_columns(prepared_clients):\n", | |
" # Temporary column used for determining the validity of a row\n", | |
" is_valid = \"_is_valid\"\n", | |
"\n", | |
" pcd = F.from_unixtime(F.col(\"profile_creation_date\") * 24 * 60 * 60)\n", | |
" client_date = to_datetime('subsession_start_date', \"yyyy-MM-dd\")\n", | |
" days_since_creation = F.datediff(client_date, pcd)\n", | |
" is_funnelcake = F.col('distribution_id').rlike(\"^mozilla[0-9]+.*$\")\n", | |
"\n", | |
" attr_mapping = {\n", | |
" 'client_id': None,\n", | |
" 'distribution_id': None,\n", | |
" 'subsession_start': client_date,\n", | |
" 'channel': F.col((\"normalized_channel\")),\n", | |
" 'country': (\n", | |
" F.when(F.col(\"country\").isin(top_countries), F.col(\"country\"))\n", | |
" .otherwise(F.lit(\"ROW\"))),\n", | |
" # Bug 1289573: Support values like \"mozilla86\" and \"mozilla86-utility-existing\"\n", | |
" 'is_funnelcake': (\n", | |
" F.when(is_funnelcake, F.lit(\"yes\"))\n", | |
" .otherwise(F.lit(\"no\"))),\n", | |
" 'cohort_date': pcd, # TODO: overlaps with profile_creation\n", | |
" 'day_number': days_since_creation,\n", | |
" 'current_version': F.col(\"env_build_version\"),\n", | |
" }\n", | |
"\n", | |
" # Set the attributes to null if it's invalid\n", | |
" select_attr = [\n", | |
" F.when(F.col(is_valid), expr).otherwise(F.lit(None)).alias(attr)\n", | |
" for attr, expr in build_select_expr(attr_mapping).iteritems()\n", | |
" ]\n", | |
" select_expr = select_attr\n", | |
"\n", | |
" cleaned_data = (\n", | |
" # Compile per-client rows for the current retention period\n", | |
" prepared_clients\n", | |
" # Filter out seemingly impossible rows. One very obvious notion\n", | |
" # is to make sure that a profile is always created before a sub-session.\n", | |
" # Unlike `sane_date` in previous versions, this is idempotent and only\n", | |
" # depends on the data.\n", | |
" .withColumn(\"profile_creation\", F.date_format(pcd, 'yyyy-MM-dd'))\n", | |
" .withColumn(is_valid, (\n", | |
" F.col(\"profile_creation\").isNotNull() &\n", | |
" (F.col(\"profile_creation\") > \"2000-01-01\") &\n", | |
" (pcd <= client_date)\n", | |
" ))\n", | |
" .select(\"profile_creation\", *select_expr)\n", | |
" # Set default values for the rows\n", | |
" .fillna({\n", | |
" 'cohort_date': \"2000-01-01\",\n", | |
" 'is_funnelcake': \"no\",\n", | |
" \"day_number\": -1,\n", | |
" })\n", | |
" )\n", | |
" return cleaned_data" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"main_summary = (\n", | |
" spark.read\n", | |
" .option(\"mergeSchema\", \"true\")\n", | |
" .parquet(\"s3://telemetry-parquet/main_summary/v4\")\n", | |
" .where(\"submission_date_s3 > '20170301'\")\n", | |
" .where(\"sample_id='57'\")\n", | |
")\n", | |
"\n", | |
"subset = main_summary.select(\n", | |
" \"client_id\",\n", | |
" \"profile_creation_date\",\n", | |
" \"subsession_start_date\",\n", | |
" \"distribution_id\",\n", | |
" \"normalized_channel\",\n", | |
" \"country\",\n", | |
" \"env_build_version\"\n", | |
")\n", | |
"cleaned_clients = clean_columns(subset)\n", | |
"cleaned_clients.show()" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": { | |
"collapsed": false | |
}, | |
"source": [ | |
"Copy into a new cell if you would like to persist the intermediate step.\n", | |
"```python\n", | |
"cleaned_clients_path = (\n", | |
" \"s3://net-mozaws-prod-us-west-2-pipeline-analysis/\"\n", | |
" \"amiyaguchi/retention_intermediate/main_summary/v1/\"\n", | |
")\n", | |
"cleaned_clients.write.parquet(cleaned_clients_path)\n", | |
"cleaned_clients = spark.read.parquet(cleaned_clients_path)\n", | |
"```" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false, | |
"scrolled": true | |
}, | |
"outputs": [], | |
"source": [ | |
"new_profile = (\n", | |
" spark.read\n", | |
" .parquet(\"s3://net-mozaws-prod-us-west-2-pipeline-data/telemetry-new-profile-parquet/v1/\")\n", | |
")\n", | |
"\n", | |
"subset = (\n", | |
" new_profile\n", | |
" .selectExpr(\n", | |
" \"client_id\",\n", | |
" \"environment.profile.creation_date as profile_creation_date\",\n", | |
" \"from_unixtime(metadata.creation_timestamp/pow(10, 9)) as subsession_start_date\",\n", | |
" \"environment.partner.distribution_id\",\n", | |
" \"metadata.normalized_channel\",\n", | |
" \"metadata.geo_country as country\",\n", | |
" \"environment.build.version as env_build_version\",\n", | |
" # \"environment.settings.attribution\"\n", | |
" )\n", | |
" .where(\"crc32(encode(client_id, 'UTF-8')) % 100 = 57\")\n", | |
")\n", | |
"new_profile_cleaned = clean_columns(subset)\n", | |
"new_profile_cleaned.show()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [ | |
"processed = (\n", | |
" cleaned_clients\n", | |
" .union(new_profile_cleaned)\n", | |
" .where(\"client_id is not null\")\n", | |
")\n", | |
"\n", | |
"path = (\n", | |
" \"s3://net-mozaws-prod-us-west-2-pipeline-analysis/\"\n", | |
" \"amiyaguchi/retention_intermediate/cleaned/v2/\"\n", | |
")\n", | |
"processed.write.parquet(path, mode=\"overwrite\")" | |
] | |
} | |
], | |
"metadata": { | |
"anaconda-cloud": {}, | |
"kernelspec": { | |
"display_name": "Python 2", | |
"language": "python", | |
"name": "python2" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 2 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython2", | |
"version": "2.7.13" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 1 | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
if [[ -z "$bucket" ]]; then | |
echo "Missing arguments!" 1>&2 | |
exit 1 | |
fi | |
cd /tmp | |
git clone https://github.com/mozilla/telemetry-batch-view.git | |
cd telemetry-batch-view | |
sbt assembly | |
group=$(cat << END | |
subsession_start,cohort_date,day_number,channel,current_version,country,distribution_id,is_funnelcake | |
END | |
) | |
echo $group | |
spark-submit --master yarn \ | |
--deploy-mode client \ | |
--class com.mozilla.telemetry.views.GenericCountView \ | |
target/scala-2.11/telemetry-batch-view-1.1.jar \ | |
--files "s3://net-mozaws-prod-us-west-2-pipeline-analysis/amiyaguchi/retention_intermediate/cleaned/v2" \ | |
--submission-date-col "submission_date" \ | |
--count-column "client_id" \ | |
--select "*, date_format(subsession_start, 'yyyyMMdd') as submission_date" \ | |
--grouping-columns "$group" \ | |
--where "client_id IS NOT NULL" \ | |
--output "$bucket/retention_dev" \ | |
--version "v2" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment