Skip to content

Instantly share code, notes, and snippets.

@harterrt
Last active February 7, 2017 17:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save harterrt/2a052f653c50df10920cfdb19c362438 to your computer and use it in GitHub Desktop.
Save harterrt/2a052f653c50df10920cfdb19c362438 to your computer and use it in GitHub Desktop.
cliqz testpilot pipeline
Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
# coding: utf-8
# In[10]:
# %load ~/cliqz_ping_pipeline/transform.py
import ujson as json
from datetime import *
import pandas as pd
from pyspark.sql.types import *
from pyspark.sql.functions import split
from moztelemetry import get_pings_properties
from moztelemetry.dataset import Dataset
class ColumnConfig:
def __init__(self, name, path, cleaning_func, struct_type):
self.name = name
self.path = path
self.cleaning_func = cleaning_func
self.struct_type = struct_type
class DataFrameConfig:
def __init__(self, col_configs):
self.columns = [ColumnConfig(*col) for col in col_configs]
def toStructType(self):
return StructType(map(
lambda col: StructField(col.name, col.struct_type, True),
self.columns))
def get_names(self):
return map(lambda col: col.name, self.columns)
def get_paths(self):
return map(lambda col: col.path, self.columns)
def pings_to_df(sqlContext, pings, data_frame_config):
"""Performs simple data pipelining on raw pings
Arguments:
data_frame_config: a list of tuples of the form:
(name, path, cleaning_func, column_type)
"""
def build_cell(ping, column_config):
"""Takes a json ping and a column config and returns a cleaned cell"""
raw_value = ping[column_config.path]
func = column_config.cleaning_func
if func is not None:
return func(raw_value)
else:
return raw_value
def ping_to_row(ping):
return [build_cell(ping, col) for col in data_frame_config.columns]
filtered_pings = get_pings_properties(pings, data_frame_config.get_paths())
return sqlContext.createDataFrame(
filtered_pings.map(ping_to_row),
schema = data_frame_config.toStructType())
def save_df(df, name, date_partition, partitions=1):
if date_partition is not None:
partition_str = "/submission={day}".format(day=date_partition)
else:
partition_str=""
path_fmt = "s3n://telemetry-parquet/harter/cliqz_{name}/v1{partition_str}"
path = path_fmt.format(name=name, partition_str=partition_str)
df.coalesce(partitions).write.mode("overwrite").parquet(path)
def __main__(sc, sqlContext):
yesterday = (date.today() - timedelta(1)).strftime("%Y%m%d")
get_doctype_pings = lambda docType: Dataset.from_source("telemetry") .where(docType=docType) .where(submissionDate=yesterday) .where(appName="Firefox") .records(sc)
get_cliqz_version = lambda x: x["testpilot@cliqz.com"]["version"] if x is not None and "testpilot@cliqz.com" in x.keys() else None
has_addon = lambda x: "testpilot@cliqz.com" in x.keys() if x is not None else None
get_event = lambda x: x[0]["event"] if x is not None else None
get_event_object = lambda x: x[0]["object"] if x is not None else None
testpilot_df = pings_to_df(
sqlContext,
get_doctype_pings("testpilot"),
DataFrameConfig([
("client_id", "clientId", None, StringType()),
("creation_date", "creationDate", None, StringType()),
("geo", "meta/geoCountry", None, StringType()),
("locale", "environment/settings/locale", None, StringType()),
("channel", "meta/normalizedChannel", None, StringType()),
("os", "meta/os", None, StringType()),
("telemetry_enabled", "environment/settings/telemetryEnabled", None, BooleanType()),
("has_addon", "environment/addons/activeAddons", has_addon, StringType()),
("cliqz_version", "environment/addons/activeAddons", get_cliqz_version, StringType()),
("event", "payload/events", get_event, StringType()),
("event_object", "payload/events", get_event_object, StringType()),
("test", "payload/test", None, StringType())
])).filter("test = '@testpilot-addon'") \
.filter("event_object = 'testpilot@cliqz.com'")
#save_df(testpilot_df, "testpilot", yesterday)
testpilottest_df = pings_to_df(
sqlContext,
get_doctype_pings("testpilottest"),
DataFrameConfig([
("client_id", "clientId", None, StringType()),
("cliqz_client_id", "payload/payload/cliqzSession", None, StringType()),
("session_id", "payload/payload/sessionId", None, StringType()),
("subsession_id", "payload/payload/subsessionId", None, StringType()),
("date", "meta/submissionDate", None, StringType()),
("client_timestamp", "creationDate", None, StringType()),
("geo", "meta/geoCountry", None, StringType()),
("locale", "environment/settings/locale", None, StringType()),
("channel", "meta/normalizedChannel", None, StringType()),
("os", "meta/os", None, StringType()),
("telemetry_enabled", "environment/settings/telemetryEnabled", None, StringType()),
("has_addon", "environment/addons/activeAddons", has_addon, StringType()),
("cliqz_version", "environment/addons/activeAddons", get_cliqz_version, StringType()),
("event", "payload/payload/event", None, StringType()),
("content_search_engine", "payload/payload/contentSearch", None, StringType()),
("test", "payload/test", None, StringType())
])).filter("event IS NOT NULL") \
.filter("test = 'testpilot@cliqz.com'")
#save_df(testpilottest_df, "testpilottest", yesterday, partitions=32)
search_df = sqlContext.read.options(header=True) .csv("s3://net-mozaws-prod-cliqz/testpilot-cliqz-telemetry.csv") .withColumn("id", split("udid", "\|")[0]) # Add ID column
save_df(search_df, "search", None)
return testpilot_df, testpilottest_df, search_df
# In[11]:
tp, tpt, ss = __main__(sc, sqlContext)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment