-
-
Save acmiyaguchi/e319fc2af5c61bd5229a3e13bcde67c1 to your computer and use it in GitHub Desktop.
fix_sync
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
# # Bug 1329023 - ReDash errors when parsing parquet files output by SyncView | |
# | |
# This notebook finds and removes invalid entries in the SyncView dataset. | |
# | |
# The `engines.name` field is problematic. | |
# In[ ]: | |
get_ipython().run_cell_magic(u'bash', u'--out setup_out', u'\ncd /home/hadoop/\ngit clone https://github.com/acmiyaguchi/telemetry-batch-view.git\ncd telemetry-batch-view\ngit checkout sync_part\nsbt assembly') | |
# In[1]: | |
from datetime import datetime as dt, timedelta, date | |
from py4j.protocol import Py4JJavaError | |
import pyspark.sql.functions as F | |
import subprocess | |
import traceback | |
import re | |
def get_sync_df(datestring=None): | |
df = spark.read.parquet("s3://telemetry-parquet/sync_summary/v1/") | |
if datestring: | |
df = df.where(F.col("submission_date_s3") == datestring) | |
return df | |
def get_bad_partition(): | |
query = """ | |
WITH engine_errors AS ( | |
SELECT | |
uid, | |
app_display_version, | |
trunc('day', from_unixtime("when"/1000)) AS sync_date, | |
engine | |
FROM sync_summary | |
/* Unpack engines array into t table with column engine and join to table */ | |
LATERAL VIEW explode(engines) t AS engine | |
), | |
syncs_per_day AS ( | |
SELECT | |
trunc('day', from_unixtime("when"/1000)) AS sync_date, | |
count(uid) as syncs | |
FROM sync_summary | |
GROUP BY 1 | |
), | |
failures_per_day AS ( | |
SELECT | |
engine_errors.sync_date AS sync_date, | |
engine.name AS engine_name, | |
SUM( | |
CASE | |
WHEN engine.failureReason IS NOT NULL THEN 1 | |
ELSE 0 | |
END | |
) AS failures | |
FROM engine_errors | |
GROUP BY 1,2 | |
) | |
SELECT | |
syncs_per_day.sync_date, | |
engine_name, | |
-- failures, | |
-- syncs | |
CAST(failures AS DOUBLE) / CAST(syncs AS DOUBLE) * 100 AS failure_rate | |
FROM syncs_per_day | |
LEFT JOIN failures_per_day ON syncs_per_day.sync_date = failures_per_day.sync_date | |
WHERE syncs_per_day.sync_date >= timestamp '2016-10-01 00:00:00.000' AND syncs_per_day.sync_date <= timestamp '2016-10-16 00:00:00.000' | |
GROUP BY 1,2,3 --,4 | |
ORDER BY 1,2 | |
""" | |
df = get_sync_df() | |
sqlContext.registerDataFrameAsTable(df, "sync_summary") | |
bad_part_date = None | |
try: | |
sqlContext.sql(query).show() | |
except Py4JJavaError: | |
tb = traceback.format_exc() | |
for line in tb.split('\n'): | |
pat = "(?<=submission_date_s3=)(.*)(?=/part)" | |
match = re.search(pat, line) | |
if match: | |
bad_part_date = match.group(0) | |
break | |
return bad_part_date | |
# In[2]: | |
bad_part = get_bad_partition() | |
if not bad_part: | |
print("SUCCESS") | |
print(bad_part) | |
# ## Appendix: Useful functions | |
# In[3]: | |
# NOTE: does not run because of multiple spark contexts | |
def run_sync_view(datestring): | |
args = [ | |
'spark-submit', | |
'--master=yarn', | |
'--deploy-mode=client', | |
'--class=com.mozilla.telemetry.views.SyncView', | |
('/home/hadoop/telemetry-batch-view/' | |
'target/scala-2.11/telemetry-batch-view-1.1.jar'), | |
'--bucket=telemetry-parquet', | |
'--from={}'.format(datestring), | |
'--to={}'.format(datestring) | |
] | |
print("Running SyncView for {}".format(datestring)) | |
print(" ".join(args)) | |
return subprocess.call(args) | |
def daterange(start_date, end_date): | |
start_day = dt.strptime(start_date, "%Y%m%d") | |
end_day = dt.strptime(end_date, "%Y%m%d") | |
for n in range(int((end_day - start_day).days) + 1): | |
yield (start_day + timedelta(n)).strftime("%Y%m%d") | |
def is_valid(df): | |
valid = True | |
engines = df.select(F.explode('engines').alias('engine')) | |
try: | |
engines.select('engine.name').count() | |
except Py4JJavaError: | |
valid = False | |
return valid | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment