-
-
Save dzeber/222d4d566577bb41c781f9e7a99b8e18 to your computer and use it in GitHub Desktop.
Cliqz data
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
# In[1]: | |
import ujson as json | |
import matplotlib.pyplot as plt | |
import pandas as pd | |
import numpy as np | |
import datetime | |
from __future__ import division | |
import feather | |
import boto3 | |
from boto3.s3.transfer import S3Transfer | |
get_ipython().magic(u'pylab inline') | |
from moztelemetry.dataset import Dataset | |
from moztelemetry.spark import get_pings_properties | |
from pyspark.sql import Row | |
import pyspark.sql.functions as fun | |
from pyspark.sql.types import StringType | |
pd.set_option("display.max_rows", None) | |
# In[2]: | |
## Not including spot nodes... | |
sc.defaultParallelism | |
# In[3]: | |
def format_submission_date(submission_date): | |
""" Convert a submission datestring of the form 'yyyymmdd' to an ISO format datestring. """ | |
return datetime.datetime.strptime(submission_date, "%Y%m%d").strftime("%Y-%m-%d") | |
def format_pcd(pcd): | |
""" Convert a profile creation date as a Unix datestamp to an ISO format datestring. """ | |
if pcd is None or pcd <= 0: | |
return None | |
## Convert the PCD to seconds since the epoch first. | |
return datetime.datetime.utcfromtimestamp(pcd * 86400).strftime("%Y-%m-%d") | |
def iso_to_submission_format(iso_date): | |
""" Convert an ISO datestring back to the submission date format 'yyyymmdd'. """ | |
if not iso_date: | |
return None | |
return iso_date.replace("-", "") | |
client = boto3.client('s3', 'us-west-2') | |
transfer = S3Transfer(client) | |
def write_file_to_s3(filename, s3_path): | |
""" Write a file in the current dir to S3. """ | |
## Remove "s3://" prefix and split out bucket name. | |
data_bucket, s3_path_key = s3_path[5:].split("/", 1) | |
transfer.upload_file(filename, data_bucket, s3_path_key + filename) | |
def none_to_zero(DF, cols): | |
""" Convert None in given numeric cols to zeros. """ | |
for colname in cols: | |
col = fun.col(colname) | |
DF = DF.withColumn(colname, fun.when(col.isNull(), 0).otherwise(col)) | |
return DF | |
# ## Test pilot pings | |
# __Note__: The study was launched on Jan 10, 2017. | |
# In[3]: | |
#prev_shield_s3_path = "s3://mozilla-metrics/user/dzeber/tmp/unified_search/raw_pings_2016-12-19/" | |
#new_shield_s3_path = "s3://mozilla-metrics/user/dzeber/tmp/unified_search/shield_pings_20161121-20170103/" | |
#txp_s3_path = "s3://mozilla-metrics/user/dzeber/tmp/txp_cliqz/" | |
start_date = "20170101" | |
#end_date = "20170103" | |
# ### `testpilot` pings | |
# In[4]: | |
txp_data = Dataset.from_source("telemetry") .where(docType="testpilot") .where(submissionDate=lambda d: d >= start_date) .where(appName="Firefox") .records(sc) .cache() | |
# In[5]: | |
txp_data.count() | |
# In[6]: | |
def is_txp_cliqz(ping): | |
""" Determine if a `testpilot` ping is related to Cliqz. """ | |
payload = ping.get("payload") | |
if not payload: | |
return False | |
events = payload.get("events") | |
if not events: | |
return False | |
return payload.get("test") == "@testpilot-addon" and events[0].get("object") == "testpilot@cliqz.com" | |
# Tabulate events by submission date | |
# In[8]: | |
cliqz_txp = txp_data.filter(is_txp_cliqz) | |
cliqz_txp.count() | |
# In[57]: | |
def cliqz_events_row(ping): | |
date = ping["meta"]["submissionDate"] | |
event = ping["payload"].get("events")[0]["event"] | |
return Row(date=date, event=event) | |
cliqz_txp_events = spark.createDataFrame(cliqz_txp.map(cliqz_events_row)) | |
cliqz_total_sub = cliqz_txp_events.groupBy("date").count() .withColumnRenamed("count", "total_pings") | |
cliqz_events_df = cliqz_txp_events.groupBy("date").pivot("event").count() | |
df_cols = cliqz_events_df.columns | |
df_cols.remove("date") | |
cliqz_events_df = none_to_zero(cliqz_events_df, df_cols) | |
# In[59]: | |
cliqz_events_df = cliqz_events_df.join(cliqz_total_sub, on="date") | |
# In[60]: | |
cliqz_events_df.orderBy("date").toPandas() | |
# ### `testpilottest` pings | |
# In[49]: | |
txp_test_data = Dataset.from_source("telemetry") .where(docType="testpilottest") .where(submissionDate=lambda d: d >= start_date) .where(appName="Firefox") .records(sc) | |
# In[ ]: | |
## Slow. | |
##txp_test_data.count() | |
# In[6]: | |
def is_txp_test_cliqz(ping): | |
""" Determine if a `testpilottest` ping is related to Cliqz. """ | |
payload = ping.get("payload") | |
if not payload: | |
return False | |
test = payload.get("test") | |
if not test: | |
return False | |
return test == "testpilot@cliqz.com" and payload.get("payload", {}).get("event") | |
# In[51]: | |
## 1 master, 10 spot nodes | |
txpt_cliqz = txp_test_data.filter(is_txp_test_cliqz) .cache() | |
get_ipython().magic(u'time txpt_cliqz.count()') | |
# In[71]: | |
def cliqz_test_row(ping): | |
date = ping["meta"]["submissionDate"] | |
event = ping["payload"].get("payload", {}).get("event") | |
return Row(date=date, event=event) | |
cliqz_txpt_events = spark.createDataFrame(txpt_cliqz.map(cliqz_test_row)) | |
cliqz_total_sub_test = cliqz_txpt_events.groupBy("date").count() .withColumnRenamed("count", "total_pings") | |
cliqz_txpt_events_df = cliqz_txpt_events.groupBy("date").pivot("event").count() | |
df_cols = cliqz_txpt_events_df.columns | |
df_cols.remove("date") | |
cliqz_txpt_events_df = none_to_zero(cliqz_txpt_events_df, df_cols) | |
cliqz_events_df = cliqz_events_df.join(cliqz_total_sub_test, on="date") | |
# In[72]: | |
cliqz_txpt_events_df.orderBy("date").toPandas() | |
# ----- | |
# | |
# ### Take a look at the data from `testpilot` pings | |
# In[17]: | |
## Test pilot pings by submission date. | |
txp_data.map(lambda p: p["meta"]["submissionDate"]).countByValue() | |
# In[54]: | |
txp_payloads = txp_data.map(lambda p: p["payload"]) | |
## Look at some sample payloads. | |
txp_payloads.take(10) | |
# In[65]: | |
## What keys does the `payload` block have? | |
txp_payloads.flatMap(lambda p: p.keys()).countByValue() | |
# In[55]: | |
## What are the different values for "test"? | |
txp_payloads.map(lambda p: p.get("test")).countByValue() | |
# In[56]: | |
## Looks like we want the ones with `test == "@testpilot-addon"`. | |
txp_payloads_tpa = txp_payloads.filter(lambda p: p.get("test") == "@testpilot-addon") | |
# In[57]: | |
## From above, we see `events` is an array. Does it ever have multiple entries? | |
## No. | |
txp_payloads_tpa.filter(lambda p: len(p.get("events")) > 1).count() | |
# In[59]: | |
## What are the different event types? | |
txp_payloads_tpa.map(lambda p: p.get("events")[0].get("event")) .countByValue() | |
# In[61]: | |
## What `object`s are recorded for enabled/disabled events? | |
## These are TxP experiments. | |
txp_payloads_tpa.filter(lambda p: p.get("events")[0].get("event") in ("enabled", "disabled")) .map(lambda p: p.get("events")[0].get("object")) .countByValue() | |
# In[63]: | |
## What keys (add-on IDs) are in the payloads with `tests` rather than `test`? | |
txp_payloads.filter(lambda p: "test" not in p) .flatMap(lambda p: p.get("tests").keys()) .countByValue() | |
# ### Take a look at the `testpilottest` pings for Cliqz | |
# In[61]: | |
txpt_payloads = txpt_cliqz.map(lambda p: p["payload"]) | |
## Look at some sample payloads. | |
txpt_payloads.take(10) | |
# In[62]: | |
## What keys does the `payload` block have? | |
txpt_payloads.flatMap(lambda p: p.keys()).countByValue() | |
# In[63]: | |
## What keys does the nested `payload` block have? | |
txpt_payloads.flatMap(lambda p: p["payload"].keys()).countByValue() | |
# In[64]: | |
## What are the different event types? | |
txpt_payloads.map(lambda p: p["payload"].get("event")).countByValue() | |
# In[65]: | |
## What are the different values for `contentSearch`? | |
txpt_payloads.map(lambda p: p["payload"].get("contentSearch")).countByValue() | |
# In[66]: | |
## Some payloads with content search: | |
txpt_payloads.filter(lambda p: p["payload"].get("contentSearch")).take(10) | |
# ## Try out the Cliqz client ID decryption | |
# | |
# Note the mapping from UT clientID to Cliqz ID is one-to-many, since the encrypted Cliqz ID changes regularly. | |
# In[4]: | |
txp_test_data = Dataset.from_source("telemetry") .where(docType="testpilottest") .where(submissionDate=lambda d: d >= "20170111" and d <= "20170118") .where(appName="Firefox") .records(sc) | |
# In[ ]: | |
## 1 master, 30 spot nodes | |
## Took about 30 min. | |
txpt_cliqz = txp_test_data.filter(is_txp_test_cliqz) .cache() | |
get_ipython().magic(u'time txpt_cliqz.count()') | |
# In[10]: | |
txpt_cliqz.count() | |
# In[20]: | |
def get_id_pair(ping): | |
return Row( | |
client_id = ping.get("clientId"), | |
cliqz_id = ping["payload"]["payload"].get("cliqzSession") | |
) | |
id_mapping = spark.createDataFrame(txpt_cliqz.map(get_id_pair)) .dropDuplicates() | |
# In[25]: | |
## How many mapping pairs? | |
id_mapping.count() | |
# In[26]: | |
## How many unique UT client IDs? | |
id_mapping.select("client_id").dropDuplicates().count() | |
# In[28]: | |
## How many Cliqz IDs are recorded per UT client ID? | |
id_mapping.groupBy("client_id").count() .withColumnRenamed("count", "num_cliqz_ids") .groupBy("num_cliqz_ids").count() .orderBy("num_cliqz_ids").toPandas() | |
# In[29]: | |
## What do the IDs look like? | |
id_mapping.limit(20).toPandas() | |
# In[31]: | |
id_mapping_pd = id_mapping.toPandas() | |
# In[34]: | |
id_mapping_pd.to_csv("cliqz_id_mapping.csv") | |
write_file_to_s3("cliqz_id_mapping.csv", "s3://mozilla-metrics/user/dzeber/tmp/txp_cliqz/") | |
# In[46]: | |
import json | |
import base64 | |
import random | |
from Crypto.Cipher import AES | |
KEY = "" ## Ask me for the key | |
def encrypt(input): | |
secret = AES.new(KEY[:32]) | |
tmp = (str(input) + (AES.block_size - len(str(input)) % AES.block_size) * "\0") | |
return base64.b64encode(secret.encrypt(tmp)) | |
def decrypt(input): | |
secret = AES.new(KEY[:32]) | |
raw = secret.decrypt(base64.b64decode(input)) | |
return raw.rstrip("\0") | |
def is_decrypted(val): | |
if val == "not-found": | |
return True | |
if len(val) <= 10: | |
return False | |
parts = val.split('|') | |
return ( | |
(len(parts) == 3 and len(parts[1]) == 5) | |
or | |
(len(parts) == 2 and len(parts[1]) == 5) | |
) | |
def encode_session(ses): | |
return encrypt(str(random.random())[-4:] + ses + str(random.random())[-4:]) | |
def decode_session(enc_ses): | |
return decrypt(enc_ses)[4:-4] | |
def decode(data): | |
decrypted = False | |
encrypted = False | |
# session ids might get unconsistent at start -> use only one for all the signals in a set | |
for line in data: | |
if line and 'session' in line and len(line['session']) > 0: | |
if is_decrypted(line['session']): | |
# unecrypted session | |
decrypted = line['session'] | |
encrypted = encode_session(decrypted) | |
else: | |
# already encrypted - decryption needed | |
decrypted = decode_session(line['session']) | |
break | |
# the decrypted value is valid | |
if decrypted and len(decrypted) > 10 and is_decrypted(decrypted): | |
# if it was an encrypted signal - decript it | |
for line in data: | |
line['session'] = decrypted | |
# re-encrypt the session id on startup signal if | |
# it was already decrypted | |
# it was not already encrypted | |
if not encrypted and line.get('startup', False): | |
encrypted = encode_session(decrypted) | |
ret = { | |
'data': data | |
} | |
# if a new encryption is generated, send it back | |
if encrypted and len(encrypted) > 10: | |
ret['session'] = encrypted | |
else: | |
# if something wrong is happening do not store anything | |
ret = { | |
'data': [] | |
} | |
return ret | |
def decrypt_cliqz_id(cliqz_id): | |
if is_decrypted(cliqz_id): | |
return cliqz_id | |
return decode_session(cliqz_id.strip()) | |
# In[47]: | |
decrypt_cliqz_udf = fun.udf(decrypt_cliqz_id, StringType()) | |
# In[48]: | |
id_mapping_decr = id_mapping.withColumn("cliqz_id_decrypted", decrypt_cliqz_udf("cliqz_id")) | |
# In[50]: | |
id_mapping_decr.limit(20).toPandas() | |
# ## Cliqz derived datasets | |
# In[2]: | |
s3_path_txp = "s3://telemetry-parquet/harter/cliqz_testpilot/v1/" | |
s3_path_txpt = "s3://telemetry-parquet/harter/cliqz_testpilottest/v1/" | |
s3_path_search = "s3://telemetry-parquet/harter/cliqz_search/v1/" | |
# In[13]: | |
DF_txp = sqlContext.read.parquet(s3_path_txp) | |
DF_txpt = sqlContext.read.parquet(s3_path_txpt) | |
DF_search = sqlContext.read.parquet(s3_path_search) | |
# In[29]: | |
def print_table_summary(DF, datecol="submission", numrows=20): | |
""" Print # rows, submission date range, schema, and top few rows. """ | |
print("Num row: {:,}".format(DF.count())) | |
daterange = DF.selectExpr( | |
"min({}) as min_date".format(datecol), | |
"max({}) as max_date".format(datecol)).first() | |
print("Submission date range: {} to {}".format(daterange.min_date, daterange.max_date)) | |
DF.printSchema() | |
display(DF.limit(numrows).toPandas()) | |
# In[25]: | |
print_table_summary(DF_txp) | |
# In[26]: | |
print_table_summary(DF_txpt) | |
# In[30]: | |
print_table_summary(DF_search, datecol="start_time") | |
# In[32]: | |
## Daily overall search counts | |
DF_search.groupBy("id", "start_time").count() .groupBy("start_time").count() .orderBy("start_time").toPandas() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment