Skip to content

Instantly share code, notes, and snippets.

@dzeber
Last active February 9, 2017 22:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dzeber/222d4d566577bb41c781f9e7a99b8e18 to your computer and use it in GitHub Desktop.
Save dzeber/222d4d566577bb41c781f9e7a99b8e18 to your computer and use it in GitHub Desktop.
Cliqz data
Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
# coding: utf-8
# In[1]:
import ujson as json
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import datetime
from __future__ import division
import feather
import boto3
from boto3.s3.transfer import S3Transfer
get_ipython().magic(u'pylab inline')
from moztelemetry.dataset import Dataset
from moztelemetry.spark import get_pings_properties
from pyspark.sql import Row
import pyspark.sql.functions as fun
from pyspark.sql.types import StringType
pd.set_option("display.max_rows", None)
# In[2]:
## Not including spot nodes...
sc.defaultParallelism
# In[3]:
def format_submission_date(submission_date):
""" Convert a submission datestring of the form 'yyyymmdd' to an ISO format datestring. """
return datetime.datetime.strptime(submission_date, "%Y%m%d").strftime("%Y-%m-%d")
def format_pcd(pcd):
""" Convert a profile creation date as a Unix datestamp to an ISO format datestring. """
if pcd is None or pcd <= 0:
return None
## Convert the PCD to seconds since the epoch first.
return datetime.datetime.utcfromtimestamp(pcd * 86400).strftime("%Y-%m-%d")
def iso_to_submission_format(iso_date):
""" Convert an ISO datestring back to the submission date format 'yyyymmdd'. """
if not iso_date:
return None
return iso_date.replace("-", "")
client = boto3.client('s3', 'us-west-2')
transfer = S3Transfer(client)
def write_file_to_s3(filename, s3_path):
""" Write a file in the current dir to S3. """
## Remove "s3://" prefix and split out bucket name.
data_bucket, s3_path_key = s3_path[5:].split("/", 1)
transfer.upload_file(filename, data_bucket, s3_path_key + filename)
def none_to_zero(DF, cols):
""" Convert None in given numeric cols to zeros. """
for colname in cols:
col = fun.col(colname)
DF = DF.withColumn(colname, fun.when(col.isNull(), 0).otherwise(col))
return DF
# ## Test pilot pings
# __Note__: The study was launched on Jan 10, 2017.
# In[3]:
#prev_shield_s3_path = "s3://mozilla-metrics/user/dzeber/tmp/unified_search/raw_pings_2016-12-19/"
#new_shield_s3_path = "s3://mozilla-metrics/user/dzeber/tmp/unified_search/shield_pings_20161121-20170103/"
#txp_s3_path = "s3://mozilla-metrics/user/dzeber/tmp/txp_cliqz/"
start_date = "20170101"
#end_date = "20170103"
# ### `testpilot` pings
# In[4]:
txp_data = Dataset.from_source("telemetry") .where(docType="testpilot") .where(submissionDate=lambda d: d >= start_date) .where(appName="Firefox") .records(sc) .cache()
# In[5]:
txp_data.count()
# In[6]:
def is_txp_cliqz(ping):
""" Determine if a `testpilot` ping is related to Cliqz. """
payload = ping.get("payload")
if not payload:
return False
events = payload.get("events")
if not events:
return False
return payload.get("test") == "@testpilot-addon" and events[0].get("object") == "testpilot@cliqz.com"
# Tabulate events by submission date
# In[8]:
cliqz_txp = txp_data.filter(is_txp_cliqz)
cliqz_txp.count()
# In[57]:
def cliqz_events_row(ping):
date = ping["meta"]["submissionDate"]
event = ping["payload"].get("events")[0]["event"]
return Row(date=date, event=event)
cliqz_txp_events = spark.createDataFrame(cliqz_txp.map(cliqz_events_row))
cliqz_total_sub = cliqz_txp_events.groupBy("date").count() .withColumnRenamed("count", "total_pings")
cliqz_events_df = cliqz_txp_events.groupBy("date").pivot("event").count()
df_cols = cliqz_events_df.columns
df_cols.remove("date")
cliqz_events_df = none_to_zero(cliqz_events_df, df_cols)
# In[59]:
cliqz_events_df = cliqz_events_df.join(cliqz_total_sub, on="date")
# In[60]:
cliqz_events_df.orderBy("date").toPandas()
# ### `testpilottest` pings
# In[49]:
txp_test_data = Dataset.from_source("telemetry") .where(docType="testpilottest") .where(submissionDate=lambda d: d >= start_date) .where(appName="Firefox") .records(sc)
# In[ ]:
## Slow.
##txp_test_data.count()
# In[6]:
def is_txp_test_cliqz(ping):
""" Determine if a `testpilottest` ping is related to Cliqz. """
payload = ping.get("payload")
if not payload:
return False
test = payload.get("test")
if not test:
return False
return test == "testpilot@cliqz.com" and payload.get("payload", {}).get("event")
# In[51]:
## 1 master, 10 spot nodes
txpt_cliqz = txp_test_data.filter(is_txp_test_cliqz) .cache()
get_ipython().magic(u'time txpt_cliqz.count()')
# In[71]:
def cliqz_test_row(ping):
date = ping["meta"]["submissionDate"]
event = ping["payload"].get("payload", {}).get("event")
return Row(date=date, event=event)
cliqz_txpt_events = spark.createDataFrame(txpt_cliqz.map(cliqz_test_row))
cliqz_total_sub_test = cliqz_txpt_events.groupBy("date").count() .withColumnRenamed("count", "total_pings")
cliqz_txpt_events_df = cliqz_txpt_events.groupBy("date").pivot("event").count()
df_cols = cliqz_txpt_events_df.columns
df_cols.remove("date")
cliqz_txpt_events_df = none_to_zero(cliqz_txpt_events_df, df_cols)
cliqz_events_df = cliqz_events_df.join(cliqz_total_sub_test, on="date")
# In[72]:
cliqz_txpt_events_df.orderBy("date").toPandas()
# -----
#
# ### Take a look at the data from `testpilot` pings
# In[17]:
## Test pilot pings by submission date.
txp_data.map(lambda p: p["meta"]["submissionDate"]).countByValue()
# In[54]:
txp_payloads = txp_data.map(lambda p: p["payload"])
## Look at some sample payloads.
txp_payloads.take(10)
# In[65]:
## What keys does the `payload` block have?
txp_payloads.flatMap(lambda p: p.keys()).countByValue()
# In[55]:
## What are the different values for "test"?
txp_payloads.map(lambda p: p.get("test")).countByValue()
# In[56]:
## Looks like we want the ones with `test == "@testpilot-addon"`.
txp_payloads_tpa = txp_payloads.filter(lambda p: p.get("test") == "@testpilot-addon")
# In[57]:
## From above, we see `events` is an array. Does it ever have multiple entries?
## No.
txp_payloads_tpa.filter(lambda p: len(p.get("events")) > 1).count()
# In[59]:
## What are the different event types?
txp_payloads_tpa.map(lambda p: p.get("events")[0].get("event")) .countByValue()
# In[61]:
## What `object`s are recorded for enabled/disabled events?
## These are TxP experiments.
txp_payloads_tpa.filter(lambda p: p.get("events")[0].get("event") in ("enabled", "disabled")) .map(lambda p: p.get("events")[0].get("object")) .countByValue()
# In[63]:
## What keys (add-on IDs) are in the payloads with `tests` rather than `test`?
txp_payloads.filter(lambda p: "test" not in p) .flatMap(lambda p: p.get("tests").keys()) .countByValue()
# ### Take a look at the `testpilottest` pings for Cliqz
# In[61]:
txpt_payloads = txpt_cliqz.map(lambda p: p["payload"])
## Look at some sample payloads.
txpt_payloads.take(10)
# In[62]:
## What keys does the `payload` block have?
txpt_payloads.flatMap(lambda p: p.keys()).countByValue()
# In[63]:
## What keys does the nested `payload` block have?
txpt_payloads.flatMap(lambda p: p["payload"].keys()).countByValue()
# In[64]:
## What are the different event types?
txpt_payloads.map(lambda p: p["payload"].get("event")).countByValue()
# In[65]:
## What are the different values for `contentSearch`?
txpt_payloads.map(lambda p: p["payload"].get("contentSearch")).countByValue()
# In[66]:
## Some payloads with content search:
txpt_payloads.filter(lambda p: p["payload"].get("contentSearch")).take(10)
# ## Try out the Cliqz client ID decryption
#
# Note the mapping from UT clientID to Cliqz ID is one-to-many, since the encrypted Cliqz ID changes regularly.
# In[4]:
txp_test_data = Dataset.from_source("telemetry") .where(docType="testpilottest") .where(submissionDate=lambda d: d >= "20170111" and d <= "20170118") .where(appName="Firefox") .records(sc)
# In[ ]:
## 1 master, 30 spot nodes
## Took about 30 min.
txpt_cliqz = txp_test_data.filter(is_txp_test_cliqz) .cache()
get_ipython().magic(u'time txpt_cliqz.count()')
# In[10]:
txpt_cliqz.count()
# In[20]:
def get_id_pair(ping):
return Row(
client_id = ping.get("clientId"),
cliqz_id = ping["payload"]["payload"].get("cliqzSession")
)
id_mapping = spark.createDataFrame(txpt_cliqz.map(get_id_pair)) .dropDuplicates()
# In[25]:
## How many mapping pairs?
id_mapping.count()
# In[26]:
## How many unique UT client IDs?
id_mapping.select("client_id").dropDuplicates().count()
# In[28]:
## How many Cliqz IDs are recorded per UT client ID?
id_mapping.groupBy("client_id").count() .withColumnRenamed("count", "num_cliqz_ids") .groupBy("num_cliqz_ids").count() .orderBy("num_cliqz_ids").toPandas()
# In[29]:
## What do the IDs look like?
id_mapping.limit(20).toPandas()
# In[31]:
id_mapping_pd = id_mapping.toPandas()
# In[34]:
id_mapping_pd.to_csv("cliqz_id_mapping.csv")
write_file_to_s3("cliqz_id_mapping.csv", "s3://mozilla-metrics/user/dzeber/tmp/txp_cliqz/")
# In[46]:
import json
import base64
import random
from Crypto.Cipher import AES
KEY = "" ## Ask me for the key
def encrypt(input):
secret = AES.new(KEY[:32])
tmp = (str(input) + (AES.block_size - len(str(input)) % AES.block_size) * "\0")
return base64.b64encode(secret.encrypt(tmp))
def decrypt(input):
secret = AES.new(KEY[:32])
raw = secret.decrypt(base64.b64decode(input))
return raw.rstrip("\0")
def is_decrypted(val):
if val == "not-found":
return True
if len(val) <= 10:
return False
parts = val.split('|')
return (
(len(parts) == 3 and len(parts[1]) == 5)
or
(len(parts) == 2 and len(parts[1]) == 5)
)
def encode_session(ses):
return encrypt(str(random.random())[-4:] + ses + str(random.random())[-4:])
def decode_session(enc_ses):
return decrypt(enc_ses)[4:-4]
def decode(data):
decrypted = False
encrypted = False
# session ids might get unconsistent at start -> use only one for all the signals in a set
for line in data:
if line and 'session' in line and len(line['session']) > 0:
if is_decrypted(line['session']):
# unecrypted session
decrypted = line['session']
encrypted = encode_session(decrypted)
else:
# already encrypted - decryption needed
decrypted = decode_session(line['session'])
break
# the decrypted value is valid
if decrypted and len(decrypted) > 10 and is_decrypted(decrypted):
# if it was an encrypted signal - decript it
for line in data:
line['session'] = decrypted
# re-encrypt the session id on startup signal if
# it was already decrypted
# it was not already encrypted
if not encrypted and line.get('startup', False):
encrypted = encode_session(decrypted)
ret = {
'data': data
}
# if a new encryption is generated, send it back
if encrypted and len(encrypted) > 10:
ret['session'] = encrypted
else:
# if something wrong is happening do not store anything
ret = {
'data': []
}
return ret
def decrypt_cliqz_id(cliqz_id):
if is_decrypted(cliqz_id):
return cliqz_id
return decode_session(cliqz_id.strip())
# In[47]:
decrypt_cliqz_udf = fun.udf(decrypt_cliqz_id, StringType())
# In[48]:
id_mapping_decr = id_mapping.withColumn("cliqz_id_decrypted", decrypt_cliqz_udf("cliqz_id"))
# In[50]:
id_mapping_decr.limit(20).toPandas()
# ## Cliqz derived datasets
# In[2]:
s3_path_txp = "s3://telemetry-parquet/harter/cliqz_testpilot/v1/"
s3_path_txpt = "s3://telemetry-parquet/harter/cliqz_testpilottest/v1/"
s3_path_search = "s3://telemetry-parquet/harter/cliqz_search/v1/"
# In[13]:
DF_txp = sqlContext.read.parquet(s3_path_txp)
DF_txpt = sqlContext.read.parquet(s3_path_txpt)
DF_search = sqlContext.read.parquet(s3_path_search)
# In[29]:
def print_table_summary(DF, datecol="submission", numrows=20):
""" Print # rows, submission date range, schema, and top few rows. """
print("Num row: {:,}".format(DF.count()))
daterange = DF.selectExpr(
"min({}) as min_date".format(datecol),
"max({}) as max_date".format(datecol)).first()
print("Submission date range: {} to {}".format(daterange.min_date, daterange.max_date))
DF.printSchema()
display(DF.limit(numrows).toPandas())
# In[25]:
print_table_summary(DF_txp)
# In[26]:
print_table_summary(DF_txpt)
# In[30]:
print_table_summary(DF_search, datecol="start_time")
# In[32]:
## Daily overall search counts
DF_search.groupBy("id", "start_time").count() .groupBy("start_time").count() .orderBy("start_time").toPandas()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment