-
-
Save acmiyaguchi/4610f1a19748908a7314feb0d550fcae to your computer and use it in GitHub Desktop.
validate_sync_errors
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
# # Bug 1329023 - ReDash errors when parsing parquet files output by SyncView | |
# | |
# This notebook replicates the a parquet decoding error found in a SyncView ReDash query using Spark. | |
# | |
# ## Simplified error case | |
# Using the problematic partition, we narrow down the error to accessing the `engine.name` field. | |
# In[2]: | |
import pyspark.sql.functions as F | |
invalid = ("s3://net-mozaws-prod-us-west-2-pipeline-analysis/" | |
"amiyaguchi/sync_summary/v1/submission_date_s3=20161208") | |
df = spark.read.parquet(invalid) | |
# In[3]: | |
engines = df.select(F.explode('engines').alias('engine')) | |
# force the dataframe to collect using count | |
engines.select('engine.name').count() | |
# ## Original reported bug (ported to spark) | |
# The original report allows us to find the problematic parquet partition. The only change is a `unnest(engines)` to `LATERAL VIEW explode(engines)`. | |
# In[4]: | |
df = spark.read.parquet("s3://telemetry-parquet/sync_summary/v1/" | |
"submission_date_s3=20161208/") | |
# In[5]: | |
query = """ | |
WITH engine_errors AS ( | |
SELECT | |
uid, | |
app_display_version, | |
trunc('day', from_unixtime("when"/1000)) AS sync_date, | |
engine | |
FROM sync_summary | |
/* Unpack engines array into t table with column engine and join to table */ | |
LATERAL VIEW explode(engines) t AS engine | |
), | |
syncs_per_day AS ( | |
SELECT | |
trunc('day', from_unixtime("when"/1000)) AS sync_date, | |
count(uid) as syncs | |
FROM sync_summary | |
GROUP BY 1 | |
), | |
failures_per_day AS ( | |
SELECT | |
engine_errors.sync_date AS sync_date, | |
engine.name AS engine_name, | |
SUM( | |
CASE | |
WHEN engine.failureReason IS NOT NULL THEN 1 | |
ELSE 0 | |
END | |
) AS failures | |
FROM engine_errors | |
GROUP BY 1,2 | |
) | |
SELECT | |
syncs_per_day.sync_date, | |
engine_name, | |
-- failures, | |
-- syncs | |
CAST(failures AS DOUBLE) / CAST(syncs AS DOUBLE) * 100 AS failure_rate | |
FROM syncs_per_day | |
LEFT JOIN failures_per_day ON syncs_per_day.sync_date = failures_per_day.sync_date | |
WHERE syncs_per_day.sync_date >= timestamp '2016-10-01 00:00:00.000' AND syncs_per_day.sync_date <= timestamp '2016-10-16 00:00:00.000' | |
GROUP BY 1,2,3 --,4 | |
ORDER BY 1,2 | |
""" | |
sqlContext.registerDataFrameAsTable(df, "sync_summary") | |
sqlContext.sql(query).show() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment