Skip to content

Instantly share code, notes, and snippets.

@acmiyaguchi
Created June 30, 2017 21:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save acmiyaguchi/30a4aafef6e2c4061767b8da90b6eba0 to your computer and use it in GitHub Desktop.
Save acmiyaguchi/30a4aafef6e2c4061767b8da90b6eba0 to your computer and use it in GitHub Desktop.
[WIP] Macrobase_Crash_Rates
Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
# coding: utf-8
# # WIP: MacroBase and Telemetry-Streaming
# ## Anomaly Detection and Explanation of Crash Rates
# We will be observing crash rates, with metrics dervived from the [e10s stability dashboard][1]. The crash rate is defined as
#
# $$
# \text{rate} = \frac
# {\text{# crashes}}
# {\text{usage khours}}
# $$
#
# This definition provides a normalized crash rate that can be used to compare clients across segments. The stability dashboard also includes a color-coded table for comparing rates relative to the leading week's rate.
#
# The exploration of this pipeline follows the following structure:
# * Data preparation and exploration of crash metrics
# * Batched explaination over crash rate
# * Batched explaination over relative crash rate w.r.t. previous week
# * Windowed explaination over crash rates using ADR and AMC
# * Results
# * Further Work: Spark Streaming Architecture
#
# [1]: https://chutten.github.io/telemetry-dashboard/crashes/ "Stability Dashboard (Telemetry) - e10s"
#
# In[1]:
from pyspark.sql import functions as F
path = (
"s3://net-mozaws-prod-us-west-2-pipeline-analysis"
"/amiyaguchi/macrobase/crash_rates/v1/"
)
path
# ## Data Preparation
#
# ### Feature Selection
# #### Attribution Selection
# The following features have been used for the e10s rollout, and should be useful for finding anomalous sub-populations.
# In[2]:
attributes = [
'normalized_channel',
'env_build_version',
'env_build_id',
'app_name',
'os',
'os_version',
'env_build_arch',
'country',
'active_experiment_id',
'active_experiment_branch',
'e10s_enabled',
'e10s_cohort',
'gfx_compositor',
"submission_date_s3"
]
# #### Metric Selection
#
# `usage_khours` are derived from the client subsession length. The three types of available crash counts in the main_summary dataset as of `main_summary/v4` are content, plugin, and gmplugin crashes. Finally, the crash rates are derived from the crash type and the `usage_khours`.
# In[3]:
seconds_per_hour = 60 * 60
seconds_per_day = seconds_per_hour * 24
def crash_rate(crashes, usage="usage_khours"):
return (
(F.col(crashes) / F.col(usage))
.alias("{}_rate".format(crashes))
)
usage_khours = (
F.when((F.col("subsession_length") >= 0) &
(F.col("subsession_length") < 180 * seconds_per_day),
(F.col("subsession_length") / seconds_per_hour / 1000))
.otherwise(0.0)
.cast('double')
.alias("usage_khours")
)
crash_fields = [
"crashes_detected_content",
"crashes_detected_plugin",
"crashes_detected_gmplugin",
]
crash_metrics = crash_fields + [crash_rate(x) for x in crash_fields]
metrics = [F.col("usage_khours")] + crash_metrics
# ### Extract relevant features from `main_summary`
# In[55]:
import operator
start, end = "20170415", "20170501"
main_summary = (
spark
.read
.option("mergeSchema", "true")
.parquet("s3://telemetry-parquet/main_summary/v4")
)
crash_rates = (
main_summary
.where(F.col("sample_id") == 27)
.withColumn("usage_khours", usage_khours)
.select(["timestamp", "sample_id"] + attributes + metrics)
.where(F.col("submission_date_s3") >= start)
.where(F.col("submission_date_s3") < end)
.where(
reduce(operator.__or__,
[F.col(x).isNotNull() for x in crash_fields]))
)
# ### Repartition and Persist data
# In[56]:
timestamp = F.from_unixtime(F.col("timestamp")/10**9)
crash_rates_by_day = (
crash_rates
.withColumn("submission_day", F.dayofyear(timestamp))
.orderBy("timestamp")
)
(
crash_rates_by_day
.write
.partitionBy("submission_day", "sample_id")
.parquet(path, mode="overwrite")
)
# ## Data Exploration
# In[6]:
from pyspark.sql import functions as F
crash_rates = spark.read.parquet(path)
# In[7]:
crash_rates.select(crash_fields).describe().show()
# In[8]:
(
crash_rates
.groupBy("submission_day")
.agg(*[F.sum(x).alias(x) for x in ["usage_khours"] + crash_fields])
.select(
"submission_day",
*[crash_rate(x) for x in crash_fields])
).show(truncate=False)
# In[60]:
from pyspark.sql import functions as F, Window
def rank_id(dimensions, ordering):
"""Add a column representing the rank over dimensions"""
window_spec = (
Window
.partitionBy(*dimensions)
.orderBy(ordering)
)
return F.row_number().over(window_spec)
def top_by_partition(dataframe, dimensions, ordering):
"""Find the top item for each dimension. """
if isinstance(dimensions, basestring):
dimensions = [dimensions]
return (
dataframe
.withColumn("rank_id", rank_id(dimensions, ordering))
.where(F.col("rank_id") == 1)
.drop("rank_id")
)
latest_version = (
top_by_partition(
crash_rates.where("env_build_id is not null"),
["normalized_channel"],
F.desc("env_build_id")
)
.select("normalized_channel", "env_build_version")
)
latest_version.show()
# In[61]:
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
def with_crash_rates(dataframe, columns):
"""columns: strings"""
acc = dataframe
for col in columns:
acc.withColumn("{}_rate".format(col), crash_rate)
rates_by_channel = (
crash_rates
.join(latest_version, ['normalized_channel', 'env_build_version'], 'right')
.groupBy("submission_day", "normalized_channel")
.agg(*[F.sum(x).alias(x) for x in ["usage_khours"] + crash_fields])
.select(
"submission_day",
F.col("normalized_channel").alias("channel"),
"usage_khours",
*[crash_rate(x) for x in crash_fields])
)
ax = None
for channel in ['nightly', 'beta', 'release', 'esr']:
df = (
rates_by_channel
.where(F.col("normalized_channel") == channel)
.select("submission_day", "crashes_detected_plugin_rate")
.orderBy("submission_day")
)
df = df.toPandas().set_index(["submission_day"])
ax = df.plot(ax=ax)
plt.grid(True)
plt.show()
# ## MacroBase
# In[8]:
def with_score(df, metric, colname="score", approx=0.01):
"""Return a dataframe with a new scored column"""
assert metric in df.columns
median = df.approxQuantile(metric, [0.5,], approx)[0]
MAD_expr = F.abs(F.col(metric)-F.lit(median)).alias(metric)
MAD = df.select(MAD_expr).approxQuantile(metric, [0.5], approx)[0]
return df.withColumn(colname, (F.col(metric) - F.lit(median))/F.lit(MAD))
def label_by_percentile(df, metric, colname='outlier', cutoff=0.02, approx=0.01):
"""Label points based on a metric, including both high and low points"""
boundary = [cutoff, 1.0-cutoff]
q = df.approxQuantile(metric, boundary, approx)
lo, hi = q[0], q[1]
return df.withColumn(
colname,
F.when((F.col(metric) < F.lit(lo)) | (F.col(metric) > hi), 1.0).otherwise(0.0)
)
def compute_risk_ratio(df, outlier_count, inlier_count, min_risk_ratio):
""" Return DataFrame with risk ratio column
(a_o/(a_o+a_i)) / (b_o/(b_o + b_i))
"""
total_exposed = (F.col("inliers") + F.col("outliers"))
unexposed_outlier = (F.lit(outlier_count) - F.col("outliers"))
ratio_this = (F.col("inliers") / total_exposed)
ratio_other = (unexposed_outlier /
((F.lit(inlier_count + outlier_count) - total_exposed)))
return (
df
.withColumn("risk_ratio", ratio_this / ratio_other)
.where(F.col("risk_ratio") >= min_risk_ratio)
)
def explain_dataframe(df, attributes, min_support=0.1, min_risk_ratio=3.0):
assert "outlier" in df.columns
outliers = df.where("outlier = 1.0")
inliers = df.where("outlier = 0.0")
outlier_count = outliers.count()
inlier_count = inliers.count()
total_count = outlier_count + inlier_count
supported_outliers = (
outliers
.cube(attributes)
.agg(F.count('*').alias("outliers"))
.where(F.col("outliers") > outlier_count * min_support)
)
supported_attributes = (
inliers
.join(supported_outliers, attributes, 'right')
.cube(attributes)
.agg(F.count('*').alias("inliers"),
F.sum('outliers').alias('outliers'))
.where(F.col('inliers') > inlier_count * min_support)
)
risky_combos = compute_risk_ratio(
supported_attributes,
outlier_count,
inlier_count,
min_risk_ratio
)
return (
risky_combos
.withColumn("support", F.col("outliers") / F.lit(outlier_count))
.select(
"support",
F.col("outliers").alias("num_records"),
F.col("risk_ratio").alias("ratio_to_inliers"),
F.concat_ws(", ", *[F.col(x) for x in attributes]).alias("items"))
)
def classify_and_explain(dataframe, metric):
labeled = with_score(dataframe, metric).na.fill(0.0)
classified = label_by_percentile(labeled, 'score')
return explain_dataframe(classified, attributes)
# ## Batched Explanation over Crash Rates
# In[ ]:
cleaned = crash_rates.na.fill(0.0)
res = classify_and_explain(cleaned, 'crashes_detected_plugin_rate')
# In[ ]:
res.show()
# ## Batched Explaination over Relative Crash Rates (X%)
# ## Windowed Explaination over Crash Rates
# ## Streaming Architecture
# ### Streaming example
# ```python
# # Create DataFrame representing the stream of input lines from connection to localhost:9999
# lines = spark \
# .readStream \
# .format("socket") \
# .option("host", "localhost") \
# .option("port", 9999) \
# .load()
#
# # Split the lines into words
# words = lines.select(
# explode(
# split(lines.value, " ")
# ).alias("word")
# )
#
# # Generate running word count
# wordCounts = words.groupBy("word").count()
#
# # Start running the query that prints the running counts to the console
# query = wordCounts \
# .writeStream \
# .outputMode("complete") \
# .format("console") \
# .start()
#
# query.awaitTermination()
# ```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment