-
-
Save harterrt/2a052f653c50df10920cfdb19c362438 to your computer and use it in GitHub Desktop.
cliqz testpilot pipeline
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
# In[10]: | |
# %load ~/cliqz_ping_pipeline/transform.py | |
import ujson as json | |
from datetime import * | |
import pandas as pd | |
from pyspark.sql.types import * | |
from pyspark.sql.functions import split | |
from moztelemetry import get_pings_properties | |
from moztelemetry.dataset import Dataset | |
class ColumnConfig: | |
def __init__(self, name, path, cleaning_func, struct_type): | |
self.name = name | |
self.path = path | |
self.cleaning_func = cleaning_func | |
self.struct_type = struct_type | |
class DataFrameConfig: | |
def __init__(self, col_configs): | |
self.columns = [ColumnConfig(*col) for col in col_configs] | |
def toStructType(self): | |
return StructType(map( | |
lambda col: StructField(col.name, col.struct_type, True), | |
self.columns)) | |
def get_names(self): | |
return map(lambda col: col.name, self.columns) | |
def get_paths(self): | |
return map(lambda col: col.path, self.columns) | |
def pings_to_df(sqlContext, pings, data_frame_config): | |
"""Performs simple data pipelining on raw pings | |
Arguments: | |
data_frame_config: a list of tuples of the form: | |
(name, path, cleaning_func, column_type) | |
""" | |
def build_cell(ping, column_config): | |
"""Takes a json ping and a column config and returns a cleaned cell""" | |
raw_value = ping[column_config.path] | |
func = column_config.cleaning_func | |
if func is not None: | |
return func(raw_value) | |
else: | |
return raw_value | |
def ping_to_row(ping): | |
return [build_cell(ping, col) for col in data_frame_config.columns] | |
filtered_pings = get_pings_properties(pings, data_frame_config.get_paths()) | |
return sqlContext.createDataFrame( | |
filtered_pings.map(ping_to_row), | |
schema = data_frame_config.toStructType()) | |
def save_df(df, name, date_partition, partitions=1): | |
if date_partition is not None: | |
partition_str = "/submission={day}".format(day=date_partition) | |
else: | |
partition_str="" | |
path_fmt = "s3n://telemetry-parquet/harter/cliqz_{name}/v1{partition_str}" | |
path = path_fmt.format(name=name, partition_str=partition_str) | |
df.coalesce(partitions).write.mode("overwrite").parquet(path) | |
def __main__(sc, sqlContext): | |
yesterday = (date.today() - timedelta(1)).strftime("%Y%m%d") | |
get_doctype_pings = lambda docType: Dataset.from_source("telemetry") .where(docType=docType) .where(submissionDate=yesterday) .where(appName="Firefox") .records(sc) | |
get_cliqz_version = lambda x: x["testpilot@cliqz.com"]["version"] if x is not None and "testpilot@cliqz.com" in x.keys() else None | |
has_addon = lambda x: "testpilot@cliqz.com" in x.keys() if x is not None else None | |
get_event = lambda x: x[0]["event"] if x is not None else None | |
get_event_object = lambda x: x[0]["object"] if x is not None else None | |
testpilot_df = pings_to_df( | |
sqlContext, | |
get_doctype_pings("testpilot"), | |
DataFrameConfig([ | |
("client_id", "clientId", None, StringType()), | |
("creation_date", "creationDate", None, StringType()), | |
("geo", "meta/geoCountry", None, StringType()), | |
("locale", "environment/settings/locale", None, StringType()), | |
("channel", "meta/normalizedChannel", None, StringType()), | |
("os", "meta/os", None, StringType()), | |
("telemetry_enabled", "environment/settings/telemetryEnabled", None, BooleanType()), | |
("has_addon", "environment/addons/activeAddons", has_addon, StringType()), | |
("cliqz_version", "environment/addons/activeAddons", get_cliqz_version, StringType()), | |
("event", "payload/events", get_event, StringType()), | |
("event_object", "payload/events", get_event_object, StringType()), | |
("test", "payload/test", None, StringType()) | |
])).filter("test = '@testpilot-addon'") \ | |
.filter("event_object = 'testpilot@cliqz.com'") | |
#save_df(testpilot_df, "testpilot", yesterday) | |
testpilottest_df = pings_to_df( | |
sqlContext, | |
get_doctype_pings("testpilottest"), | |
DataFrameConfig([ | |
("client_id", "clientId", None, StringType()), | |
("cliqz_client_id", "payload/payload/cliqzSession", None, StringType()), | |
("session_id", "payload/payload/sessionId", None, StringType()), | |
("subsession_id", "payload/payload/subsessionId", None, StringType()), | |
("date", "meta/submissionDate", None, StringType()), | |
("client_timestamp", "creationDate", None, StringType()), | |
("geo", "meta/geoCountry", None, StringType()), | |
("locale", "environment/settings/locale", None, StringType()), | |
("channel", "meta/normalizedChannel", None, StringType()), | |
("os", "meta/os", None, StringType()), | |
("telemetry_enabled", "environment/settings/telemetryEnabled", None, StringType()), | |
("has_addon", "environment/addons/activeAddons", has_addon, StringType()), | |
("cliqz_version", "environment/addons/activeAddons", get_cliqz_version, StringType()), | |
("event", "payload/payload/event", None, StringType()), | |
("content_search_engine", "payload/payload/contentSearch", None, StringType()), | |
("test", "payload/test", None, StringType()) | |
])).filter("event IS NOT NULL") \ | |
.filter("test = 'testpilot@cliqz.com'") | |
#save_df(testpilottest_df, "testpilottest", yesterday, partitions=32) | |
search_df = sqlContext.read.options(header=True) .csv("s3://net-mozaws-prod-cliqz/testpilot-cliqz-telemetry.csv") .withColumn("id", split("udid", "\|")[0]) # Add ID column | |
save_df(search_df, "search", None) | |
return testpilot_df, testpilottest_df, search_df | |
# In[11]: | |
tp, tpt, ss = __main__(sc, sqlContext) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment