Skip to content

Instantly share code, notes, and snippets.

@acmiyaguchi
Last active January 25, 2017 00:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save acmiyaguchi/e319fc2af5c61bd5229a3e13bcde67c1 to your computer and use it in GitHub Desktop.
Save acmiyaguchi/e319fc2af5c61bd5229a3e13bcde67c1 to your computer and use it in GitHub Desktop.
fix_sync
Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
# coding: utf-8
# # Bug 1329023 - ReDash errors when parsing parquet files output by SyncView
#
# This notebook finds and removes invalid entries in the SyncView dataset.
#
# The `engines.name` field is problematic.
# In[ ]:
get_ipython().run_cell_magic(u'bash', u'--out setup_out', u'\ncd /home/hadoop/\ngit clone https://github.com/acmiyaguchi/telemetry-batch-view.git\ncd telemetry-batch-view\ngit checkout sync_part\nsbt assembly')
# In[1]:
from datetime import datetime as dt, timedelta, date
from py4j.protocol import Py4JJavaError
import pyspark.sql.functions as F
import subprocess
import traceback
import re
def get_sync_df(datestring=None):
df = spark.read.parquet("s3://telemetry-parquet/sync_summary/v1/")
if datestring:
df = df.where(F.col("submission_date_s3") == datestring)
return df
def get_bad_partition():
query = """
WITH engine_errors AS (
SELECT
uid,
app_display_version,
trunc('day', from_unixtime("when"/1000)) AS sync_date,
engine
FROM sync_summary
/* Unpack engines array into t table with column engine and join to table */
LATERAL VIEW explode(engines) t AS engine
),
syncs_per_day AS (
SELECT
trunc('day', from_unixtime("when"/1000)) AS sync_date,
count(uid) as syncs
FROM sync_summary
GROUP BY 1
),
failures_per_day AS (
SELECT
engine_errors.sync_date AS sync_date,
engine.name AS engine_name,
SUM(
CASE
WHEN engine.failureReason IS NOT NULL THEN 1
ELSE 0
END
) AS failures
FROM engine_errors
GROUP BY 1,2
)
SELECT
syncs_per_day.sync_date,
engine_name,
-- failures,
-- syncs
CAST(failures AS DOUBLE) / CAST(syncs AS DOUBLE) * 100 AS failure_rate
FROM syncs_per_day
LEFT JOIN failures_per_day ON syncs_per_day.sync_date = failures_per_day.sync_date
WHERE syncs_per_day.sync_date >= timestamp '2016-10-01 00:00:00.000' AND syncs_per_day.sync_date <= timestamp '2016-10-16 00:00:00.000'
GROUP BY 1,2,3 --,4
ORDER BY 1,2
"""
df = get_sync_df()
sqlContext.registerDataFrameAsTable(df, "sync_summary")
bad_part_date = None
try:
sqlContext.sql(query).show()
except Py4JJavaError:
tb = traceback.format_exc()
for line in tb.split('\n'):
pat = "(?<=submission_date_s3=)(.*)(?=/part)"
match = re.search(pat, line)
if match:
bad_part_date = match.group(0)
break
return bad_part_date
# In[2]:
bad_part = get_bad_partition()
if not bad_part:
print("SUCCESS")
print(bad_part)
# ## Appendix: Useful functions
# In[3]:
# NOTE: does not run because of multiple spark contexts
def run_sync_view(datestring):
args = [
'spark-submit',
'--master=yarn',
'--deploy-mode=client',
'--class=com.mozilla.telemetry.views.SyncView',
('/home/hadoop/telemetry-batch-view/'
'target/scala-2.11/telemetry-batch-view-1.1.jar'),
'--bucket=telemetry-parquet',
'--from={}'.format(datestring),
'--to={}'.format(datestring)
]
print("Running SyncView for {}".format(datestring))
print(" ".join(args))
return subprocess.call(args)
def daterange(start_date, end_date):
start_day = dt.strptime(start_date, "%Y%m%d")
end_day = dt.strptime(end_date, "%Y%m%d")
for n in range(int((end_day - start_day).days) + 1):
yield (start_day + timedelta(n)).strftime("%Y%m%d")
def is_valid(df):
valid = True
engines = df.select(F.explode('engines').alias('engine'))
try:
engines.select('engine.name').count()
except Py4JJavaError:
valid = False
return valid
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment