Skip to content

Instantly share code, notes, and snippets.

@acmiyaguchi
Last active January 24, 2017 01:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save acmiyaguchi/4610f1a19748908a7314feb0d550fcae to your computer and use it in GitHub Desktop.
Save acmiyaguchi/4610f1a19748908a7314feb0d550fcae to your computer and use it in GitHub Desktop.
validate_sync_errors
Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
# coding: utf-8
# # Bug 1329023 - ReDash errors when parsing parquet files output by SyncView
#
# This notebook replicates the a parquet decoding error found in a SyncView ReDash query using Spark.
#
# ## Simplified error case
# Using the problematic partition, we narrow down the error to accessing the `engine.name` field.
# In[2]:
import pyspark.sql.functions as F
invalid = ("s3://net-mozaws-prod-us-west-2-pipeline-analysis/"
"amiyaguchi/sync_summary/v1/submission_date_s3=20161208")
df = spark.read.parquet(invalid)
# In[3]:
engines = df.select(F.explode('engines').alias('engine'))
# force the dataframe to collect using count
engines.select('engine.name').count()
# ## Original reported bug (ported to spark)
# The original report allows us to find the problematic parquet partition. The only change is a `unnest(engines)` to `LATERAL VIEW explode(engines)`.
# In[4]:
df = spark.read.parquet("s3://telemetry-parquet/sync_summary/v1/"
"submission_date_s3=20161208/")
# In[5]:
query = """
WITH engine_errors AS (
SELECT
uid,
app_display_version,
trunc('day', from_unixtime("when"/1000)) AS sync_date,
engine
FROM sync_summary
/* Unpack engines array into t table with column engine and join to table */
LATERAL VIEW explode(engines) t AS engine
),
syncs_per_day AS (
SELECT
trunc('day', from_unixtime("when"/1000)) AS sync_date,
count(uid) as syncs
FROM sync_summary
GROUP BY 1
),
failures_per_day AS (
SELECT
engine_errors.sync_date AS sync_date,
engine.name AS engine_name,
SUM(
CASE
WHEN engine.failureReason IS NOT NULL THEN 1
ELSE 0
END
) AS failures
FROM engine_errors
GROUP BY 1,2
)
SELECT
syncs_per_day.sync_date,
engine_name,
-- failures,
-- syncs
CAST(failures AS DOUBLE) / CAST(syncs AS DOUBLE) * 100 AS failure_rate
FROM syncs_per_day
LEFT JOIN failures_per_day ON syncs_per_day.sync_date = failures_per_day.sync_date
WHERE syncs_per_day.sync_date >= timestamp '2016-10-01 00:00:00.000' AND syncs_per_day.sync_date <= timestamp '2016-10-16 00:00:00.000'
GROUP BY 1,2,3 --,4
ORDER BY 1,2
"""
sqlContext.registerDataFrameAsTable(df, "sync_summary")
sqlContext.sql(query).show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment