Created
June 30, 2017 21:12
-
-
Save acmiyaguchi/30a4aafef6e2c4061767b8da90b6eba0 to your computer and use it in GitHub Desktop.
[WIP] Macrobase_Crash_Rates
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
# # WIP: MacroBase and Telemetry-Streaming | |
# ## Anomaly Detection and Explanation of Crash Rates | |
# We will be observing crash rates, with metrics dervived from the [e10s stability dashboard][1]. The crash rate is defined as | |
# | |
# $$ | |
# \text{rate} = \frac | |
# {\text{# crashes}} | |
# {\text{usage khours}} | |
# $$ | |
# | |
# This definition provides a normalized crash rate that can be used to compare clients across segments. The stability dashboard also includes a color-coded table for comparing rates relative to the leading week's rate. | |
# | |
# The exploration of this pipeline follows the following structure: | |
# * Data preparation and exploration of crash metrics | |
# * Batched explaination over crash rate | |
# * Batched explaination over relative crash rate w.r.t. previous week | |
# * Windowed explaination over crash rates using ADR and AMC | |
# * Results | |
# * Further Work: Spark Streaming Architecture | |
# | |
# [1]: https://chutten.github.io/telemetry-dashboard/crashes/ "Stability Dashboard (Telemetry) - e10s" | |
# | |
# In[1]: | |
from pyspark.sql import functions as F | |
path = ( | |
"s3://net-mozaws-prod-us-west-2-pipeline-analysis" | |
"/amiyaguchi/macrobase/crash_rates/v1/" | |
) | |
path | |
# ## Data Preparation | |
# | |
# ### Feature Selection | |
# #### Attribution Selection | |
# The following features have been used for the e10s rollout, and should be useful for finding anomalous sub-populations. | |
# In[2]: | |
attributes = [ | |
'normalized_channel', | |
'env_build_version', | |
'env_build_id', | |
'app_name', | |
'os', | |
'os_version', | |
'env_build_arch', | |
'country', | |
'active_experiment_id', | |
'active_experiment_branch', | |
'e10s_enabled', | |
'e10s_cohort', | |
'gfx_compositor', | |
"submission_date_s3" | |
] | |
# #### Metric Selection | |
# | |
# `usage_khours` are derived from the client subsession length. The three types of available crash counts in the main_summary dataset as of `main_summary/v4` are content, plugin, and gmplugin crashes. Finally, the crash rates are derived from the crash type and the `usage_khours`. | |
# In[3]: | |
seconds_per_hour = 60 * 60 | |
seconds_per_day = seconds_per_hour * 24 | |
def crash_rate(crashes, usage="usage_khours"): | |
return ( | |
(F.col(crashes) / F.col(usage)) | |
.alias("{}_rate".format(crashes)) | |
) | |
usage_khours = ( | |
F.when((F.col("subsession_length") >= 0) & | |
(F.col("subsession_length") < 180 * seconds_per_day), | |
(F.col("subsession_length") / seconds_per_hour / 1000)) | |
.otherwise(0.0) | |
.cast('double') | |
.alias("usage_khours") | |
) | |
crash_fields = [ | |
"crashes_detected_content", | |
"crashes_detected_plugin", | |
"crashes_detected_gmplugin", | |
] | |
crash_metrics = crash_fields + [crash_rate(x) for x in crash_fields] | |
metrics = [F.col("usage_khours")] + crash_metrics | |
# ### Extract relevant features from `main_summary` | |
# In[55]: | |
import operator | |
start, end = "20170415", "20170501" | |
main_summary = ( | |
spark | |
.read | |
.option("mergeSchema", "true") | |
.parquet("s3://telemetry-parquet/main_summary/v4") | |
) | |
crash_rates = ( | |
main_summary | |
.where(F.col("sample_id") == 27) | |
.withColumn("usage_khours", usage_khours) | |
.select(["timestamp", "sample_id"] + attributes + metrics) | |
.where(F.col("submission_date_s3") >= start) | |
.where(F.col("submission_date_s3") < end) | |
.where( | |
reduce(operator.__or__, | |
[F.col(x).isNotNull() for x in crash_fields])) | |
) | |
# ### Repartition and Persist data | |
# In[56]: | |
timestamp = F.from_unixtime(F.col("timestamp")/10**9) | |
crash_rates_by_day = ( | |
crash_rates | |
.withColumn("submission_day", F.dayofyear(timestamp)) | |
.orderBy("timestamp") | |
) | |
( | |
crash_rates_by_day | |
.write | |
.partitionBy("submission_day", "sample_id") | |
.parquet(path, mode="overwrite") | |
) | |
# ## Data Exploration | |
# In[6]: | |
from pyspark.sql import functions as F | |
crash_rates = spark.read.parquet(path) | |
# In[7]: | |
crash_rates.select(crash_fields).describe().show() | |
# In[8]: | |
( | |
crash_rates | |
.groupBy("submission_day") | |
.agg(*[F.sum(x).alias(x) for x in ["usage_khours"] + crash_fields]) | |
.select( | |
"submission_day", | |
*[crash_rate(x) for x in crash_fields]) | |
).show(truncate=False) | |
# In[60]: | |
from pyspark.sql import functions as F, Window | |
def rank_id(dimensions, ordering): | |
"""Add a column representing the rank over dimensions""" | |
window_spec = ( | |
Window | |
.partitionBy(*dimensions) | |
.orderBy(ordering) | |
) | |
return F.row_number().over(window_spec) | |
def top_by_partition(dataframe, dimensions, ordering): | |
"""Find the top item for each dimension. """ | |
if isinstance(dimensions, basestring): | |
dimensions = [dimensions] | |
return ( | |
dataframe | |
.withColumn("rank_id", rank_id(dimensions, ordering)) | |
.where(F.col("rank_id") == 1) | |
.drop("rank_id") | |
) | |
latest_version = ( | |
top_by_partition( | |
crash_rates.where("env_build_id is not null"), | |
["normalized_channel"], | |
F.desc("env_build_id") | |
) | |
.select("normalized_channel", "env_build_version") | |
) | |
latest_version.show() | |
# In[61]: | |
import matplotlib.pyplot as plt | |
import pandas as pd | |
import numpy as np | |
def with_crash_rates(dataframe, columns): | |
"""columns: strings""" | |
acc = dataframe | |
for col in columns: | |
acc.withColumn("{}_rate".format(col), crash_rate) | |
rates_by_channel = ( | |
crash_rates | |
.join(latest_version, ['normalized_channel', 'env_build_version'], 'right') | |
.groupBy("submission_day", "normalized_channel") | |
.agg(*[F.sum(x).alias(x) for x in ["usage_khours"] + crash_fields]) | |
.select( | |
"submission_day", | |
F.col("normalized_channel").alias("channel"), | |
"usage_khours", | |
*[crash_rate(x) for x in crash_fields]) | |
) | |
ax = None | |
for channel in ['nightly', 'beta', 'release', 'esr']: | |
df = ( | |
rates_by_channel | |
.where(F.col("normalized_channel") == channel) | |
.select("submission_day", "crashes_detected_plugin_rate") | |
.orderBy("submission_day") | |
) | |
df = df.toPandas().set_index(["submission_day"]) | |
ax = df.plot(ax=ax) | |
plt.grid(True) | |
plt.show() | |
# ## MacroBase | |
# In[8]: | |
def with_score(df, metric, colname="score", approx=0.01): | |
"""Return a dataframe with a new scored column""" | |
assert metric in df.columns | |
median = df.approxQuantile(metric, [0.5,], approx)[0] | |
MAD_expr = F.abs(F.col(metric)-F.lit(median)).alias(metric) | |
MAD = df.select(MAD_expr).approxQuantile(metric, [0.5], approx)[0] | |
return df.withColumn(colname, (F.col(metric) - F.lit(median))/F.lit(MAD)) | |
def label_by_percentile(df, metric, colname='outlier', cutoff=0.02, approx=0.01): | |
"""Label points based on a metric, including both high and low points""" | |
boundary = [cutoff, 1.0-cutoff] | |
q = df.approxQuantile(metric, boundary, approx) | |
lo, hi = q[0], q[1] | |
return df.withColumn( | |
colname, | |
F.when((F.col(metric) < F.lit(lo)) | (F.col(metric) > hi), 1.0).otherwise(0.0) | |
) | |
def compute_risk_ratio(df, outlier_count, inlier_count, min_risk_ratio): | |
""" Return DataFrame with risk ratio column | |
(a_o/(a_o+a_i)) / (b_o/(b_o + b_i)) | |
""" | |
total_exposed = (F.col("inliers") + F.col("outliers")) | |
unexposed_outlier = (F.lit(outlier_count) - F.col("outliers")) | |
ratio_this = (F.col("inliers") / total_exposed) | |
ratio_other = (unexposed_outlier / | |
((F.lit(inlier_count + outlier_count) - total_exposed))) | |
return ( | |
df | |
.withColumn("risk_ratio", ratio_this / ratio_other) | |
.where(F.col("risk_ratio") >= min_risk_ratio) | |
) | |
def explain_dataframe(df, attributes, min_support=0.1, min_risk_ratio=3.0): | |
assert "outlier" in df.columns | |
outliers = df.where("outlier = 1.0") | |
inliers = df.where("outlier = 0.0") | |
outlier_count = outliers.count() | |
inlier_count = inliers.count() | |
total_count = outlier_count + inlier_count | |
supported_outliers = ( | |
outliers | |
.cube(attributes) | |
.agg(F.count('*').alias("outliers")) | |
.where(F.col("outliers") > outlier_count * min_support) | |
) | |
supported_attributes = ( | |
inliers | |
.join(supported_outliers, attributes, 'right') | |
.cube(attributes) | |
.agg(F.count('*').alias("inliers"), | |
F.sum('outliers').alias('outliers')) | |
.where(F.col('inliers') > inlier_count * min_support) | |
) | |
risky_combos = compute_risk_ratio( | |
supported_attributes, | |
outlier_count, | |
inlier_count, | |
min_risk_ratio | |
) | |
return ( | |
risky_combos | |
.withColumn("support", F.col("outliers") / F.lit(outlier_count)) | |
.select( | |
"support", | |
F.col("outliers").alias("num_records"), | |
F.col("risk_ratio").alias("ratio_to_inliers"), | |
F.concat_ws(", ", *[F.col(x) for x in attributes]).alias("items")) | |
) | |
def classify_and_explain(dataframe, metric): | |
labeled = with_score(dataframe, metric).na.fill(0.0) | |
classified = label_by_percentile(labeled, 'score') | |
return explain_dataframe(classified, attributes) | |
# ## Batched Explanation over Crash Rates | |
# In[ ]: | |
cleaned = crash_rates.na.fill(0.0) | |
res = classify_and_explain(cleaned, 'crashes_detected_plugin_rate') | |
# In[ ]: | |
res.show() | |
# ## Batched Explaination over Relative Crash Rates (X%) | |
# ## Windowed Explaination over Crash Rates | |
# ## Streaming Architecture | |
# ### Streaming example | |
# ```python | |
# # Create DataFrame representing the stream of input lines from connection to localhost:9999 | |
# lines = spark \ | |
# .readStream \ | |
# .format("socket") \ | |
# .option("host", "localhost") \ | |
# .option("port", 9999) \ | |
# .load() | |
# | |
# # Split the lines into words | |
# words = lines.select( | |
# explode( | |
# split(lines.value, " ") | |
# ).alias("word") | |
# ) | |
# | |
# # Generate running word count | |
# wordCounts = words.groupBy("word").count() | |
# | |
# # Start running the query that prints the running counts to the console | |
# query = wordCounts \ | |
# .writeStream \ | |
# .outputMode("complete") \ | |
# .format("console") \ | |
# .start() | |
# | |
# query.awaitTermination() | |
# ``` |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment