Skip to content

Instantly share code, notes, and snippets.

@acmiyaguchi
Last active September 18, 2017 20:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save acmiyaguchi/f261f80920f86d23a14ca105c2a75e63 to your computer and use it in GitHub Desktop.
Save acmiyaguchi/f261f80920f86d23a14ca105c2a75e63 to your computer and use it in GitHub Desktop.
1 Day Retention v1
{"paragraphs":[{"user":"anonymous","config":{"colWidth":12,"enabled":true,"results":{},"editorSetting":{"language":"markdown","editOnDblClick":true},"editorMode":"ace/mode/markdown","editorHide":true,"tableHide":false},"settings":{"params":{},"forms":{}},"apps":[],"jobName":"paragraph_1505763413296_129190948","id":"20170918-123653_1361561743","dateCreated":"2017-09-18T12:36:53-0700","status":"FINISHED","progressUpdateIntervalMs":500,"focus":true,"$$hashKey":"object:6801","text":"%md\n\nThis notebook performs the cleaning of the `new_profile` and `main_summary` dataset for ingestion by the generic client count script. The most significant portions of this cleaning step are the following:\n\n- Keep clients that appear in major versions above or equal to 55 (starting revision of the `new_profile` ping)\n- Filter clients by the set of clients in `new_profile`\n- Filter significantly skewed clients","dateUpdated":"2017-09-18T12:42:59-0700","dateFinished":"2017-09-18T12:43:01-0700","dateStarted":"2017-09-18T12:42:59-0700","results":{"code":"SUCCESS","msg":[{"type":"HTML","data":"<div class=\"markdown-body\">\n<p>This notebook performs the cleaning of the <code>new_profile</code> and <code>main_summary</code> dataset for ingestion by the generic client count script. The most significant portions of this cleaning step are the following:</p>\n<ul>\n <li>Keep clients that appear in major versions above or equal to 55 (starting revision of the <code>new_profile</code> ping)</li>\n <li>Filter clients by the set of clients in <code>new_profile</code></li>\n <li>Filter significantly skewed clients</li>\n</ul>\n</div>"}]}},{"text":"val new_profile = (\n spark.read\n .parquet(\"s3://net-mozaws-prod-us-west-2-pipeline-data/telemetry-new-profile-parquet/v1/\")\n)\nnew_profile.createOrReplaceTempView(\"telemetry_new_profile_parquet\")","dateUpdated":"2017-09-15T13:28:37-0700","config":{"colWidth":6,"editorMode":"ace/mode/scala","results":[{"graph":{"mode":"table","height":300,"optionOpen":false,"keys":[],"values":[],"groups":[],"scatter":{}}}],"enabled":true,"editorSetting":{"language":"scala"}},"settings":{"params":{},"forms":{}},"apps":[],"jobName":"paragraph_1505507317215_-1289949095","id":"20170823-200521_630764727","dateCreated":"2017-09-15T13:28:37-0700","status":"READY","errorMessage":"","progressUpdateIntervalMs":500,"$$hashKey":"object:6802"},{"text":"val main_summary = (\n spark.read.option(\"mergeSchema\", \"true\")\n .parquet(\"s3://telemetry-parquet/main_summary/v4\")\n)\nmain_summary.createOrReplaceTempView(\"main_summary\")","dateUpdated":"2017-09-15T13:28:37-0700","config":{"colWidth":6,"editorMode":"ace/mode/scala","results":[{"graph":{"mode":"table","height":300,"optionOpen":true,"keys":[{"name":"diff","index":0,"aggr":"sum"}],"values":[{"name":"count(diff)","index":1,"aggr":"sum"}],"groups":[],"scatter":{"xAxis":{"name":"diff","index":0,"aggr":"sum"},"yAxis":{"name":"count(diff)","index":1,"aggr":"sum"}}}}],"enabled":true,"editorSetting":{"language":"scala"}},"settings":{"params":{},"forms":{}},"apps":[],"jobName":"paragraph_1505507317216_-1304184805","id":"20170823-202555_1428096625","dateCreated":"2017-09-15T13:28:37-0700","status":"READY","errorMessage":"","progressUpdateIntervalMs":500,"$$hashKey":"object:6804"},{"text":"%sql\nCREATE OR REPLACE TEMPORARY VIEW cleaned_new_profile AS\nSELECT client_id,\n date(from_unixtime(metadata.timestamp/pow(10, 9))) AS submission_date,\n date(from_unixtime(metadata.creation_timestamp/pow(10, 9))) AS subsession_start_date,\n --date(from_unixtime(environment.profile.creation_date * 3600 * 24)) AS profile_creation_date,\n metadata.normalized_channel AS channel,\n metadata.geo_country AS country,\n environment.system.os.name,\n split(environment.build.version, '\\\\.')[0] AS major_version\nFROM\n(SELECT *,\n crc32(encode(client_id, 'UTF-8'))%100 AS sample_id\n FROM telemetry_new_profile_parquet)\nWHERE sample_id = 57\n","dateUpdated":"2017-09-15T13:28:37-0700","config":{"colWidth":6,"results":[],"enabled":true,"editorSetting":{"language":"sql"},"editorMode":"ace/mode/sql"},"settings":{"params":{},"forms":{}},"apps":[],"jobName":"paragraph_1505507317217_-1304569554","id":"20170823-200927_1243213103","dateCreated":"2017-09-15T13:28:37-0700","status":"READY","errorMessage":"","progressUpdateIntervalMs":500,"$$hashKey":"object:6805"},{"text":"%sql\nCREATE OR REPLACE TEMPORARY VIEW cleaned_main_summary AS\nSELECT *\nFROM (\n SELECT client_id, \n date(from_unixtime(timestamp/pow(10, 9))) as submission_date,\n date(subsession_start_date), \n --date(from_unixtime(profile_creation_date * 3600 * 24)) as profile_creation_date,\n normalized_channel as channel,\n country,\n os,\n split(env_build_version, '\\\\.')[0] as major_version\n FROM main_summary\n WHERE sample_id='57' AND submission_date_s3 >= '20170626'\n)\nWHERE major_version >= '55' \n AND datediff(subsession_start_date, '2017-06-26') > 0","dateUpdated":"2017-09-15T13:28:37-0700","config":{"colWidth":6,"editorMode":"ace/mode/sql","results":[],"enabled":true,"editorSetting":{"language":"sql"}},"settings":{"params":{},"forms":{}},"apps":[],"jobName":"paragraph_1505507317217_-1304569554","id":"20170823-215238_1119872465","dateCreated":"2017-09-15T13:28:37-0700","status":"READY","errorMessage":"","progressUpdateIntervalMs":500,"$$hashKey":"object:6806"},{"text":"%md\n\n### Potential areas of data cleaning\n* Use the skew from `environment.profile.creation` and `metadata.creation.timestamp` to normalize reported client dates\n\n","dateUpdated":"2017-09-15T13:28:37-0700","config":{"colWidth":12,"editorMode":"ace/mode/markdown","results":[{"graph":{"mode":"table","height":300,"optionOpen":false,"keys":[],"values":[],"groups":[],"scatter":{}}}],"enabled":true,"editorSetting":{"language":"markdown","editOnDblClick":true}},"settings":{"params":{},"forms":{}},"apps":[],"jobName":"paragraph_1505507317219_-1303800056","id":"20170823-234051_577411471","dateCreated":"2017-09-15T13:28:37-0700","status":"READY","errorMessage":"","progressUpdateIntervalMs":500,"$$hashKey":"object:6807"},{"text":"spark.table(\"cleaned_new_profile\").cache()\nspark.table(\"cleaned_main_summary\").cache()","dateUpdated":"2017-09-15T13:28:37-0700","config":{"colWidth":12,"editorMode":"ace/mode/scala","results":[{"graph":{"mode":"table","height":300,"optionOpen":false,"keys":[],"values":[],"groups":[],"scatter":{}}}],"enabled":true,"editorSetting":{"language":"scala"}},"settings":{"params":{},"forms":{}},"apps":[],"jobName":"paragraph_1505507317221_-1306108549","id":"20170823-205016_1357003267","dateCreated":"2017-09-15T13:28:37-0700","status":"READY","errorMessage":"","progressUpdateIntervalMs":500,"$$hashKey":"object:6808"},{"text":"%pyspark\nfrom pyspark.sql import functions as F\n\n# ignore values that are significantly skewed\nfirst_sessions = (\n spark.table(\"cleaned_new_profile\")\n .where(\"abs(datediff(submission_date, subsession_start_date)) < 3\")\n .select(\"client_id\", F.col(\"subsession_start_date\").alias(\"profile_creation_date\"))\n)\n\n# perform a right join - only keep the clients that were originally seen as a new profile\n(\n spark.table(\"cleaned_new_profile\")\n .union(spark.table(\"cleaned_main_summary\"))\n .join(first_sessions, [\"client_id\"], \"right\")\n .withColumn(\"days_since_creation\", F.datediff(\"subsession_start_date\", \"profile_creation_date\"))\n .createOrReplaceTempView(\"retention_source\")\n)","dateUpdated":"2017-09-15T13:28:37-0700","config":{"colWidth":12,"editorMode":"ace/mode/python","results":[],"enabled":true,"editorSetting":{"language":"python"}},"settings":{"params":{},"forms":{}},"apps":[],"jobName":"paragraph_1505507317222_-1304954302","id":"20170823-224508_490988277","dateCreated":"2017-09-15T13:28:37-0700","status":"READY","errorMessage":"","progressUpdateIntervalMs":500,"$$hashKey":"object:6809"},{"text":"%md\n\nUse the following cell to determine the range of days since creation -- this is a mixture of latency and oddly set dates from the client. This could be improved by applying some sort of cleaning process to account for clock skew, in the order of days and weeks. ","user":"anonymous","dateUpdated":"2017-09-18T12:45:38-0700","config":{"colWidth":12,"enabled":true,"results":{},"editorSetting":{"language":"markdown","editOnDblClick":true},"editorMode":"ace/mode/markdown","editorHide":true,"tableHide":false},"settings":{"params":{},"forms":{}},"apps":[],"jobName":"paragraph_1505763803348_1503066853","id":"20170918-124323_1575818516","dateCreated":"2017-09-18T12:43:23-0700","status":"FINISHED","progressUpdateIntervalMs":500,"focus":true,"$$hashKey":"object:7522","dateFinished":"2017-09-18T12:45:38-0700","dateStarted":"2017-09-18T12:45:38-0700","results":{"code":"SUCCESS","msg":[{"type":"HTML","data":"<div class=\"markdown-body\">\n<p>Use the following cell to determine the range of days since creation &ndash; this is a mixture of latency and oddly set dates from the client. This could be improved by applying some sort of cleaning process to account for clock skew, in the order of days and weeks.</p>\n</div>"}]}},{"text":"%sql\n\nselect days_since_creation, count(days_since_creation) from retention_source group by days_since_creation order by days_since_creation\n","dateUpdated":"2017-09-18T12:35:19-0700","config":{"colWidth":12,"editorMode":"ace/mode/sql","results":[{"graph":{"mode":"multiBarChart","height":300,"optionOpen":true,"keys":[{"name":"days_since_creation","index":0,"aggr":"sum"}],"values":[{"name":"count(days_since_creation)","index":1,"aggr":"sum"}],"groups":[],"scatter":{"xAxis":{"name":"days_since_creation","index":0,"aggr":"sum"}},"forceY":false,"lineWithFocus":false},"helium":{}}],"enabled":true,"editorSetting":{"language":"sql","editOnDblClick":false}},"settings":{"params":{},"forms":{}},"apps":[],"jobName":"paragraph_1505507317223_-1305339051","id":"20170823-224612_1148213923","dateCreated":"2017-09-15T13:28:37-0700","status":"READY","errorMessage":"","progressUpdateIntervalMs":500,"$$hashKey":"object:6810","focus":true},{"text":"%pyspark\n\n(\n spark.table(\"retention_source\")\n .select(\n \"client_id\", \n F.col(\"subsession_start_date\").alias(\"activity_date\"), \n \"channel\", \n \"country\", \n F.col(\"name\").alias(\"os\"), \n \"major_version\", \n \"profile_creation_date\", \n \"days_since_creation\"\n )\n .write\n .partitionBy(\"activity_date\")\n .parquet(\"s3://net-mozaws-prod-us-west-2-pipeline-analysis/amiyaguchi/client_count_new_profile/v1/\")\n)","dateUpdated":"2017-09-15T13:28:37-0700","config":{"colWidth":12,"editorMode":"ace/mode/python","results":[{"graph":{"mode":"table","height":300,"optionOpen":false,"keys":[],"values":[],"groups":[],"scatter":{}}}],"enabled":true,"editorSetting":{"language":"python"}},"settings":{"params":{},"forms":{}},"apps":[],"jobName":"paragraph_1505507317224_-1307262796","id":"20170823-233902_1218485262","dateCreated":"2017-09-15T13:28:37-0700","status":"READY","errorMessage":"","progressUpdateIntervalMs":500,"$$hashKey":"object:6811"},{"text":"","dateUpdated":"2017-09-15T13:28:37-0700","config":{"colWidth":12,"graph":{"mode":"table","height":300,"optionOpen":false,"keys":[],"values":[],"groups":[],"scatter":{}},"enabled":true,"results":{},"editorSetting":{"language":"scala"},"editorMode":"ace/mode/scala"},"settings":{"params":{},"forms":{}},"apps":[],"jobName":"paragraph_1505507317226_-1306493298","id":"20170824-002613_915441723","dateCreated":"2017-09-15T13:28:37-0700","status":"READY","errorMessage":"","progressUpdateIntervalMs":500,"$$hashKey":"object:6812"}],"name":"retention_dev_v1","id":"2CUB6AD8Z","angularObjects":{"2CHU3FNGG:shared_process":[],"2CFXA5P4X:shared_process":[],"2CGBQ2A26:shared_process":[],"2CJKEWH8N:shared_process":[],"2CKCNXKWR:shared_process":[],"2CJ4QER2M:shared_process":[],"2CHBQU6NN:shared_process":[],"2CKR5RDFF:shared_process":[],"2CGJMN72K:shared_process":[],"2CJ63UZA5:shared_process":[],"2CHQFRJJE:shared_process":[],"2CH5CDT2V:shared_process":[],"2CG6XXF3C:shared_process":[],"2CJP23GXX:shared_process":[],"2CH4G5TXG:shared_process":[],"2CKCYWNRD:shared_process":[],"2CJXSCRAS:shared_process":[],"2CGP5M3QS:shared_process":[],"2CK25Z3MA:shared_process":[]},"config":{"looknfeel":"default","personalizedMode":"false"},"info":{}}
title author(s) tags created_at updated_at tldr thumbnail
retention_dev_v1
anonymous
2017-09-15 13:28:37 -0700
2017-09-18 12:45:38 -0700

This notebook performs the cleaning of the new_profile and main_summary dataset for ingestion by the generic client count script. The most significant portions of this cleaning step are the following:

  • Keep clients that appear in major versions above or equal to 55 (starting revision of the new_profile ping)
  • Filter clients by the set of clients in new_profile
  • Filter significantly skewed clients
val new_profile = (
    spark.read
    .parquet("s3://net-mozaws-prod-us-west-2-pipeline-data/telemetry-new-profile-parquet/v1/")
)
new_profile.createOrReplaceTempView("telemetry_new_profile_parquet")
val main_summary = (
    spark.read.option("mergeSchema", "true")
    .parquet("s3://telemetry-parquet/main_summary/v4")
)
main_summary.createOrReplaceTempView("main_summary")
CREATE OR REPLACE TEMPORARY VIEW cleaned_new_profile AS
SELECT client_id,
        date(from_unixtime(metadata.timestamp/pow(10, 9))) AS submission_date,
        date(from_unixtime(metadata.creation_timestamp/pow(10, 9))) AS subsession_start_date,
        --date(from_unixtime(environment.profile.creation_date * 3600 * 24)) AS profile_creation_date,
        metadata.normalized_channel AS channel,
        metadata.geo_country AS country,
        environment.system.os.name,
        split(environment.build.version, '\\.')[0] AS major_version
FROM
(SELECT *,
        crc32(encode(client_id, 'UTF-8'))%100 AS sample_id
 FROM telemetry_new_profile_parquet)
WHERE sample_id = 57
CREATE OR REPLACE TEMPORARY VIEW cleaned_main_summary AS
SELECT *
FROM (
    SELECT client_id, 
        date(from_unixtime(timestamp/pow(10, 9))) as submission_date,
        date(subsession_start_date), 
        --date(from_unixtime(profile_creation_date * 3600 * 24)) as profile_creation_date,
        normalized_channel as channel,
        country,
        os,
        split(env_build_version, '\\.')[0] as major_version
    FROM main_summary
    WHERE sample_id='57' AND submission_date_s3 >= '20170626'
)
WHERE major_version >= '55' 
    AND datediff(subsession_start_date, '2017-06-26') > 0

Potential areas of data cleaning

  • Use the skew from environment.profile.creation and metadata.creation.timestamp to normalize reported client dates
spark.table("cleaned_new_profile").cache()
spark.table("cleaned_main_summary").cache()
from pyspark.sql import functions as F

# ignore values that are significantly skewed
first_sessions = (
    spark.table("cleaned_new_profile")
    .where("abs(datediff(submission_date, subsession_start_date)) < 3")
    .select("client_id", F.col("subsession_start_date").alias("profile_creation_date"))
)

# perform a right join - only keep the clients that were originally seen as a new profile
(
    spark.table("cleaned_new_profile")
    .union(spark.table("cleaned_main_summary"))
    .join(first_sessions, ["client_id"], "right")
    .withColumn("days_since_creation", F.datediff("subsession_start_date", "profile_creation_date"))
    .createOrReplaceTempView("retention_source")
)

Use the following cell to determine the range of days since creation -- this is a mixture of latency and oddly set dates from the client. This could be improved by applying some sort of cleaning process to account for clock skew, in the order of days and weeks.

select days_since_creation, count(days_since_creation) from retention_source group by days_since_creation order by days_since_creation
(
    spark.table("retention_source")
    .select(
        "client_id", 
        F.col("subsession_start_date").alias("activity_date"), 
        "channel", 
        "country", 
        F.col("name").alias("os"), 
        "major_version", 
        "profile_creation_date", 
        "days_since_creation"
    )
    .write
    .partitionBy("activity_date")
    .parquet("s3://net-mozaws-prod-us-west-2-pipeline-analysis/amiyaguchi/client_count_new_profile/v1/")
)
#!/bin/bash
if [[ -z "$bucket" ]]; then
echo "Missing arguments!" 1>&2
exit 1
fi
cd /tmp
git clone https://github.com/mozilla/telemetry-batch-view.git
cd telemetry-batch-view
sbt assembly
group=$(cat << END
activity_date,channel,major_version,os,country,profile_creation_date,days_since_creation
END
)
echo $group
spark-submit --master yarn \
--deploy-mode client \
--class com.mozilla.telemetry.views.GenericCountView \
target/scala-2.11/telemetry-batch-view-1.1.jar \
--files "s3://net-mozaws-prod-us-west-2-pipeline-analysis/amiyaguchi/retention_intermediate/cleaned/v1" \
--submission-date-col "submission_date" \
--count-column "client_id" \
--select "*, date_format(activity_date, 'yyyyMMdd') as submission_date" \
--grouping-columns "$group" \
--where "client_id IS NOT NULL" \
--output "$bucket/retention" \
--version "1"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment