Skip to content

Instantly share code, notes, and snippets.

Last active February 7, 2017 17:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save harterrt/2a052f653c50df10920cfdb19c362438 to your computer and use it in GitHub Desktop.
Save harterrt/2a052f653c50df10920cfdb19c362438 to your computer and use it in GitHub Desktop.
cliqz testpilot pipeline
Display the source blob
Display the rendered blob
"cells": [
"cell_type": "code",
"execution_count": 10,
"metadata": {
"collapsed": false
"outputs": [],
"source": [
"# %load ~/cliqz_ping_pipeline/\n",
"import ujson as json\n",
"from datetime import *\n",
"import pandas as pd\n",
"from pyspark.sql.types import *\n",
"from pyspark.sql.functions import split\n",
"from moztelemetry import get_pings_properties\n",
"from moztelemetry.dataset import Dataset\n",
"class ColumnConfig:\n",
" def __init__(self, name, path, cleaning_func, struct_type):\n",
" = name\n",
" self.path = path\n",
" self.cleaning_func = cleaning_func\n",
" self.struct_type = struct_type\n",
"class DataFrameConfig:\n",
" def __init__(self, col_configs):\n",
" self.columns = [ColumnConfig(*col) for col in col_configs]\n",
" def toStructType(self):\n",
" return StructType(map(\n",
" lambda col: StructField(, col.struct_type, True),\n",
" self.columns))\n",
" def get_names(self):\n",
" return map(lambda col:, self.columns)\n",
" def get_paths(self):\n",
" return map(lambda col: col.path, self.columns)\n",
"def pings_to_df(sqlContext, pings, data_frame_config):\n",
" \"\"\"Performs simple data pipelining on raw pings\n",
" Arguments:\n",
" data_frame_config: a list of tuples of the form:\n",
" (name, path, cleaning_func, column_type)\n",
" \"\"\"\n",
" def build_cell(ping, column_config):\n",
" \"\"\"Takes a json ping and a column config and returns a cleaned cell\"\"\"\n",
" raw_value = ping[column_config.path]\n",
" func = column_config.cleaning_func\n",
" if func is not None:\n",
" return func(raw_value)\n",
" else:\n",
" return raw_value\n",
" def ping_to_row(ping):\n",
" return [build_cell(ping, col) for col in data_frame_config.columns]\n",
" filtered_pings = get_pings_properties(pings, data_frame_config.get_paths())\n",
" return sqlContext.createDataFrame(\n",
" schema = data_frame_config.toStructType())\n",
"def save_df(df, name, date_partition, partitions=1):\n",
" if date_partition is not None:\n",
" partition_str = \"/submission={day}\".format(day=date_partition)\n",
" else:\n",
" partition_str=\"\"\n",
" path_fmt = \"s3n://telemetry-parquet/harter/cliqz_{name}/v1{partition_str}\"\n",
" path = path_fmt.format(name=name, partition_str=partition_str)\n",
" df.coalesce(partitions).write.mode(\"overwrite\").parquet(path)\n",
"def __main__(sc, sqlContext):\n",
" yesterday = ( - timedelta(1)).strftime(\"%Y%m%d\")\n",
" get_doctype_pings = lambda docType: Dataset.from_source(\"telemetry\") \\\n",
" .where(docType=docType) \\\n",
" .where(submissionDate=yesterday) \\\n",
" .where(appName=\"Firefox\") \\\n",
" .records(sc)\n",
" get_cliqz_version = lambda x: x[\"\"][\"version\"] if x is not None and \"\" in x.keys() else None\n",
" has_addon = lambda x: \"\" in x.keys() if x is not None else None\n",
" get_event = lambda x: x[0][\"event\"] if x is not None else None\n",
" get_event_object = lambda x: x[0][\"object\"] if x is not None else None\n",
" testpilot_df = pings_to_df(\n",
" sqlContext,\n",
" get_doctype_pings(\"testpilot\"),\n",
" DataFrameConfig([\n",
" (\"client_id\", \"clientId\", None, StringType()),\n",
" (\"creation_date\", \"creationDate\", None, StringType()),\n",
" (\"geo\", \"meta/geoCountry\", None, StringType()),\n",
" (\"locale\", \"environment/settings/locale\", None, StringType()),\n",
" (\"channel\", \"meta/normalizedChannel\", None, StringType()),\n",
" (\"os\", \"meta/os\", None, StringType()),\n",
" (\"telemetry_enabled\", \"environment/settings/telemetryEnabled\", None, BooleanType()),\n",
" (\"has_addon\", \"environment/addons/activeAddons\", has_addon, StringType()),\n",
" (\"cliqz_version\", \"environment/addons/activeAddons\", get_cliqz_version, StringType()),\n",
" (\"event\", \"payload/events\", get_event, StringType()),\n",
" (\"event_object\", \"payload/events\", get_event_object, StringType()),\n",
" (\"test\", \"payload/test\", None, StringType())\n",
" ])).filter(\"test = '@testpilot-addon'\") \\\n",
" .filter(\"event_object = ''\")\n",
" #save_df(testpilot_df, \"testpilot\", yesterday)\n",
" testpilottest_df = pings_to_df(\n",
" sqlContext,\n",
" get_doctype_pings(\"testpilottest\"),\n",
" DataFrameConfig([\n",
" (\"client_id\", \"clientId\", None, StringType()),\n",
" (\"cliqz_client_id\", \"payload/payload/cliqzSession\", None, StringType()),\n",
" (\"session_id\", \"payload/payload/sessionId\", None, StringType()),\n",
" (\"subsession_id\", \"payload/payload/subsessionId\", None, StringType()),\n",
" (\"date\", \"meta/submissionDate\", None, StringType()),\n",
" (\"client_timestamp\", \"creationDate\", None, StringType()),\n",
" (\"geo\", \"meta/geoCountry\", None, StringType()),\n",
" (\"locale\", \"environment/settings/locale\", None, StringType()),\n",
" (\"channel\", \"meta/normalizedChannel\", None, StringType()),\n",
" (\"os\", \"meta/os\", None, StringType()),\n",
" (\"telemetry_enabled\", \"environment/settings/telemetryEnabled\", None, StringType()),\n",
" (\"has_addon\", \"environment/addons/activeAddons\", has_addon, StringType()),\n",
" (\"cliqz_version\", \"environment/addons/activeAddons\", get_cliqz_version, StringType()),\n",
" (\"event\", \"payload/payload/event\", None, StringType()),\n",
" (\"content_search_engine\", \"payload/payload/contentSearch\", None, StringType()),\n",
" (\"test\", \"payload/test\", None, StringType())\n",
" ])).filter(\"event IS NOT NULL\") \\\n",
" .filter(\"test = ''\")\n",
" #save_df(testpilottest_df, \"testpilottest\", yesterday, partitions=32)\n",
" search_df = \\\n",
" .csv(\"s3://net-mozaws-prod-cliqz/testpilot-cliqz-telemetry.csv\") \\\n",
" .withColumn(\"id\", split(\"udid\", \"\\|\")[0]) # Add ID column\n",
" save_df(search_df, \"search\", None)\n",
" return testpilot_df, testpilottest_df, search_df\n"
"cell_type": "code",
"execution_count": 11,
"metadata": {
"collapsed": false,
"scrolled": false
"outputs": [],
"source": [
"tp, tpt, ss = __main__(sc, sqlContext)"
"metadata": {
"anaconda-cloud": {},
"kernelspec": {
"display_name": "Python [conda root]",
"language": "python",
"name": "conda-root-py"
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.12"
"nbformat": 4,
"nbformat_minor": 1
# coding: utf-8
# In[10]:
# %load ~/cliqz_ping_pipeline/
import ujson as json
from datetime import *
import pandas as pd
from pyspark.sql.types import *
from pyspark.sql.functions import split
from moztelemetry import get_pings_properties
from moztelemetry.dataset import Dataset
class ColumnConfig:
def __init__(self, name, path, cleaning_func, struct_type): = name
self.path = path
self.cleaning_func = cleaning_func
self.struct_type = struct_type
class DataFrameConfig:
def __init__(self, col_configs):
self.columns = [ColumnConfig(*col) for col in col_configs]
def toStructType(self):
return StructType(map(
lambda col: StructField(, col.struct_type, True),
def get_names(self):
return map(lambda col:, self.columns)
def get_paths(self):
return map(lambda col: col.path, self.columns)
def pings_to_df(sqlContext, pings, data_frame_config):
"""Performs simple data pipelining on raw pings
data_frame_config: a list of tuples of the form:
(name, path, cleaning_func, column_type)
def build_cell(ping, column_config):
"""Takes a json ping and a column config and returns a cleaned cell"""
raw_value = ping[column_config.path]
func = column_config.cleaning_func
if func is not None:
return func(raw_value)
return raw_value
def ping_to_row(ping):
return [build_cell(ping, col) for col in data_frame_config.columns]
filtered_pings = get_pings_properties(pings, data_frame_config.get_paths())
return sqlContext.createDataFrame(,
schema = data_frame_config.toStructType())
def save_df(df, name, date_partition, partitions=1):
if date_partition is not None:
partition_str = "/submission={day}".format(day=date_partition)
path_fmt = "s3n://telemetry-parquet/harter/cliqz_{name}/v1{partition_str}"
path = path_fmt.format(name=name, partition_str=partition_str)
def __main__(sc, sqlContext):
yesterday = ( - timedelta(1)).strftime("%Y%m%d")
get_doctype_pings = lambda docType: Dataset.from_source("telemetry") .where(docType=docType) .where(submissionDate=yesterday) .where(appName="Firefox") .records(sc)
get_cliqz_version = lambda x: x[""]["version"] if x is not None and "" in x.keys() else None
has_addon = lambda x: "" in x.keys() if x is not None else None
get_event = lambda x: x[0]["event"] if x is not None else None
get_event_object = lambda x: x[0]["object"] if x is not None else None
testpilot_df = pings_to_df(
("client_id", "clientId", None, StringType()),
("creation_date", "creationDate", None, StringType()),
("geo", "meta/geoCountry", None, StringType()),
("locale", "environment/settings/locale", None, StringType()),
("channel", "meta/normalizedChannel", None, StringType()),
("os", "meta/os", None, StringType()),
("telemetry_enabled", "environment/settings/telemetryEnabled", None, BooleanType()),
("has_addon", "environment/addons/activeAddons", has_addon, StringType()),
("cliqz_version", "environment/addons/activeAddons", get_cliqz_version, StringType()),
("event", "payload/events", get_event, StringType()),
("event_object", "payload/events", get_event_object, StringType()),
("test", "payload/test", None, StringType())
])).filter("test = '@testpilot-addon'") \
.filter("event_object = ''")
#save_df(testpilot_df, "testpilot", yesterday)
testpilottest_df = pings_to_df(
("client_id", "clientId", None, StringType()),
("cliqz_client_id", "payload/payload/cliqzSession", None, StringType()),
("session_id", "payload/payload/sessionId", None, StringType()),
("subsession_id", "payload/payload/subsessionId", None, StringType()),
("date", "meta/submissionDate", None, StringType()),
("client_timestamp", "creationDate", None, StringType()),
("geo", "meta/geoCountry", None, StringType()),
("locale", "environment/settings/locale", None, StringType()),
("channel", "meta/normalizedChannel", None, StringType()),
("os", "meta/os", None, StringType()),
("telemetry_enabled", "environment/settings/telemetryEnabled", None, StringType()),
("has_addon", "environment/addons/activeAddons", has_addon, StringType()),
("cliqz_version", "environment/addons/activeAddons", get_cliqz_version, StringType()),
("event", "payload/payload/event", None, StringType()),
("content_search_engine", "payload/payload/contentSearch", None, StringType()),
("test", "payload/test", None, StringType())
])).filter("event IS NOT NULL") \
.filter("test = ''")
#save_df(testpilottest_df, "testpilottest", yesterday, partitions=32)
search_df = .csv("s3://net-mozaws-prod-cliqz/testpilot-cliqz-telemetry.csv") .withColumn("id", split("udid", "\|")[0]) # Add ID column
save_df(search_df, "search", None)
return testpilot_df, testpilottest_df, search_df
# In[11]:
tp, tpt, ss = __main__(sc, sqlContext)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment