Skip to content

Instantly share code, notes, and snippets.

@acmiyaguchi
Last active January 25, 2017 00:59
Show Gist options
  • Save acmiyaguchi/e319fc2af5c61bd5229a3e13bcde67c1 to your computer and use it in GitHub Desktop.
Save acmiyaguchi/e319fc2af5c61bd5229a3e13bcde67c1 to your computer and use it in GitHub Desktop.
fix_sync
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Bug 1329023 - ReDash errors when parsing parquet files output by SyncView\n",
"\n",
"This notebook finds and removes invalid entries in the SyncView dataset.\n",
"\n",
"The `engines.name` field is problematic."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"%%bash --out setup_out\n",
"\n",
"cd /home/hadoop/\n",
"git clone https://github.com/acmiyaguchi/telemetry-batch-view.git\n",
"cd telemetry-batch-view\n",
"git checkout sync_part\n",
"sbt assembly"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from datetime import datetime as dt, timedelta, date\n",
"from py4j.protocol import Py4JJavaError\n",
"import pyspark.sql.functions as F\n",
"import subprocess\n",
"import traceback\n",
"import re\n",
"\n",
"def get_sync_df(datestring=None):\n",
" df = spark.read.parquet(\"s3://telemetry-parquet/sync_summary/v1/\")\n",
" if datestring:\n",
" df = df.where(F.col(\"submission_date_s3\") == datestring)\n",
" return df\n",
"\n",
"def get_bad_partition():\n",
" query = \"\"\"\n",
" WITH engine_errors AS (\n",
" SELECT \n",
" uid, \n",
" app_display_version,\n",
" trunc('day', from_unixtime(\"when\"/1000)) AS sync_date,\n",
" engine\n",
" FROM sync_summary\n",
" /* Unpack engines array into t table with column engine and join to table */\n",
" LATERAL VIEW explode(engines) t AS engine\n",
" ),\n",
" syncs_per_day AS (\n",
" SELECT\n",
" trunc('day', from_unixtime(\"when\"/1000)) AS sync_date,\n",
" count(uid) as syncs\n",
" FROM sync_summary\n",
" GROUP BY 1\n",
" ),\n",
" failures_per_day AS (\n",
" SELECT \n",
" engine_errors.sync_date AS sync_date,\n",
" engine.name AS engine_name,\n",
" SUM(\n",
" CASE\n",
" WHEN engine.failureReason IS NOT NULL THEN 1\n",
" ELSE 0\n",
" END \n",
" ) AS failures\n",
" FROM engine_errors\n",
" GROUP BY 1,2\n",
" )\n",
"\n",
" SELECT\n",
" syncs_per_day.sync_date,\n",
" engine_name,\n",
" --\tfailures,\n",
" --\tsyncs\n",
" CAST(failures AS DOUBLE) / CAST(syncs AS DOUBLE) * 100 AS failure_rate\n",
" FROM syncs_per_day\n",
" LEFT JOIN failures_per_day ON syncs_per_day.sync_date = failures_per_day.sync_date\n",
" WHERE syncs_per_day.sync_date >= timestamp '2016-10-01 00:00:00.000' AND syncs_per_day.sync_date <= timestamp '2016-10-16 00:00:00.000'\n",
" GROUP BY 1,2,3 --,4\n",
" ORDER BY 1,2\n",
" \"\"\"\n",
" df = get_sync_df()\n",
" sqlContext.registerDataFrameAsTable(df, \"sync_summary\")\n",
" \n",
" bad_part_date = None\n",
" try:\n",
" sqlContext.sql(query).show()\n",
" except Py4JJavaError:\n",
" tb = traceback.format_exc()\n",
" for line in tb.split('\\n'):\n",
" pat = \"(?<=submission_date_s3=)(.*)(?=/part)\"\n",
" match = re.search(pat, line)\n",
" if match:\n",
" bad_part_date = match.group(0)\n",
" break\n",
" return bad_part_date"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---------+-----------+------------+\n",
"|sync_date|engine_name|failure_rate|\n",
"+---------+-----------+------------+\n",
"| null| null| null|\n",
"+---------+-----------+------------+\n",
"\n",
"SUCCESS\n",
"None\n"
]
}
],
"source": [
"bad_part = get_bad_partition()\n",
"if not bad_part:\n",
" print(\"SUCCESS\")\n",
"print(bad_part)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Appendix: Useful functions"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# NOTE: does not run because of multiple spark contexts\n",
"def run_sync_view(datestring):\n",
" args = [\n",
" 'spark-submit',\n",
" '--master=yarn',\n",
" '--deploy-mode=client',\n",
" '--class=com.mozilla.telemetry.views.SyncView',\n",
" ('/home/hadoop/telemetry-batch-view/'\n",
" 'target/scala-2.11/telemetry-batch-view-1.1.jar'),\n",
" '--bucket=telemetry-parquet',\n",
" '--from={}'.format(datestring),\n",
" '--to={}'.format(datestring)\n",
" ]\n",
" \n",
" print(\"Running SyncView for {}\".format(datestring))\n",
" print(\" \".join(args))\n",
" \n",
" return subprocess.call(args)\n",
"\n",
"def daterange(start_date, end_date):\n",
" start_day = dt.strptime(start_date, \"%Y%m%d\")\n",
" end_day = dt.strptime(end_date, \"%Y%m%d\")\n",
" for n in range(int((end_day - start_day).days) + 1):\n",
" yield (start_day + timedelta(n)).strftime(\"%Y%m%d\")\n",
"\n",
"def is_valid(df):\n",
" valid = True\n",
" engines = df.select(F.explode('engines').alias('engine'))\n",
" try:\n",
" engines.select('engine.name').count()\n",
" except Py4JJavaError:\n",
" valid = False\n",
" return valid"
]
}
],
"metadata": {
"anaconda-cloud": {},
"kernelspec": {
"display_name": "Python [default]",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.12"
}
},
"nbformat": 4,
"nbformat_minor": 1
}
# coding: utf-8
# # Bug 1329023 - ReDash errors when parsing parquet files output by SyncView
#
# This notebook finds and removes invalid entries in the SyncView dataset.
#
# The `engines.name` field is problematic.
# In[ ]:
get_ipython().run_cell_magic(u'bash', u'--out setup_out', u'\ncd /home/hadoop/\ngit clone https://github.com/acmiyaguchi/telemetry-batch-view.git\ncd telemetry-batch-view\ngit checkout sync_part\nsbt assembly')
# In[1]:
from datetime import datetime as dt, timedelta, date
from py4j.protocol import Py4JJavaError
import pyspark.sql.functions as F
import subprocess
import traceback
import re
def get_sync_df(datestring=None):
df = spark.read.parquet("s3://telemetry-parquet/sync_summary/v1/")
if datestring:
df = df.where(F.col("submission_date_s3") == datestring)
return df
def get_bad_partition():
query = """
WITH engine_errors AS (
SELECT
uid,
app_display_version,
trunc('day', from_unixtime("when"/1000)) AS sync_date,
engine
FROM sync_summary
/* Unpack engines array into t table with column engine and join to table */
LATERAL VIEW explode(engines) t AS engine
),
syncs_per_day AS (
SELECT
trunc('day', from_unixtime("when"/1000)) AS sync_date,
count(uid) as syncs
FROM sync_summary
GROUP BY 1
),
failures_per_day AS (
SELECT
engine_errors.sync_date AS sync_date,
engine.name AS engine_name,
SUM(
CASE
WHEN engine.failureReason IS NOT NULL THEN 1
ELSE 0
END
) AS failures
FROM engine_errors
GROUP BY 1,2
)
SELECT
syncs_per_day.sync_date,
engine_name,
-- failures,
-- syncs
CAST(failures AS DOUBLE) / CAST(syncs AS DOUBLE) * 100 AS failure_rate
FROM syncs_per_day
LEFT JOIN failures_per_day ON syncs_per_day.sync_date = failures_per_day.sync_date
WHERE syncs_per_day.sync_date >= timestamp '2016-10-01 00:00:00.000' AND syncs_per_day.sync_date <= timestamp '2016-10-16 00:00:00.000'
GROUP BY 1,2,3 --,4
ORDER BY 1,2
"""
df = get_sync_df()
sqlContext.registerDataFrameAsTable(df, "sync_summary")
bad_part_date = None
try:
sqlContext.sql(query).show()
except Py4JJavaError:
tb = traceback.format_exc()
for line in tb.split('\n'):
pat = "(?<=submission_date_s3=)(.*)(?=/part)"
match = re.search(pat, line)
if match:
bad_part_date = match.group(0)
break
return bad_part_date
# In[2]:
bad_part = get_bad_partition()
if not bad_part:
print("SUCCESS")
print(bad_part)
# ## Appendix: Useful functions
# In[3]:
# NOTE: does not run because of multiple spark contexts
def run_sync_view(datestring):
args = [
'spark-submit',
'--master=yarn',
'--deploy-mode=client',
'--class=com.mozilla.telemetry.views.SyncView',
('/home/hadoop/telemetry-batch-view/'
'target/scala-2.11/telemetry-batch-view-1.1.jar'),
'--bucket=telemetry-parquet',
'--from={}'.format(datestring),
'--to={}'.format(datestring)
]
print("Running SyncView for {}".format(datestring))
print(" ".join(args))
return subprocess.call(args)
def daterange(start_date, end_date):
start_day = dt.strptime(start_date, "%Y%m%d")
end_day = dt.strptime(end_date, "%Y%m%d")
for n in range(int((end_day - start_day).days) + 1):
yield (start_day + timedelta(n)).strftime("%Y%m%d")
def is_valid(df):
valid = True
engines = df.select(F.explode('engines').alias('engine'))
try:
engines.select('engine.name').count()
except Py4JJavaError:
valid = False
return valid
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment