Skip to content

Instantly share code, notes, and snippets.

@mreid-moz
Created September 9, 2016 18:55
Show Gist options
  • Save mreid-moz/092029949782249577aee92602879e2b to your computer and use it in GitHub Desktop.
Save mreid-moz/092029949782249577aee92602879e2b to your computer and use it in GitHub Desktop.
ImportCrashData
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Import Socorro crash data into the Data Platform\n",
"\n",
"We want to be able to store Socorro crash data in Parquet form so that it can be made accessible from re:dash.\n",
"\n",
"See [Bug 1273657](https://bugzilla.mozilla.org/show_bug.cgi?id=1273657) for more details"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from pyspark.sql import SQLContext\n",
"from pyspark.sql.types import *\n",
"\n",
"crash_schema = StructType([\n",
" StructField(\"additional_minidumps\", ArrayType(StringType(), containsNull = False), nullable = True),\n",
" StructField(\"addons\", ArrayType(StringType(), containsNull = False), nullable = True),\n",
" StructField(\"addons_checked\", BooleanType(), nullable = False),\n",
" StructField(\"address\", StringType(), nullable = True),\n",
" StructField(\"app_notes\", StringType(), nullable = True),\n",
" StructField(\"build_id\", StringType(), nullable = True),\n",
" StructField(\"classifications\", StructType([\n",
" StructField(\"jit\", StructType([\n",
" StructField(\"category\", StringType(), nullable = True),\n",
" StructField(\"category_return_code\", StringType(), nullable = True)\n",
" ]), nullable = True),\n",
" StructField(\"skunk_works\", StructType([\n",
" StructField(\"classification\", StringType(), nullable = True),\n",
" StructField(\"classification_data\", StringType(), nullable = True),\n",
" StructField(\"classification_version\", StringType(), nullable = True)\n",
" ]), nullable = True),\n",
" StructField(\"support\", StructType([\n",
" StructField(\"classification\", StringType(), nullable = True),\n",
" StructField(\"classification_data\", StringType(), nullable = True),\n",
" StructField(\"classification_version\", StringType(), nullable = True)\n",
" ]), nullable = True)\n",
" ]), nullable = True),\n",
" StructField(\"cpu_arch\", StringType(), nullable = True),\n",
" StructField(\"cpu_info\", StringType(), nullable = True),\n",
" StructField(\"crash_id\", StringType(), nullable = True),\n",
" StructField(\"date\", StringType(), nullable = True),\n",
" StructField(\"flash_version\", StringType(), nullable = True),\n",
" StructField(\"hang_type\", IntegerType(), nullable = True),\n",
" StructField(\"install_age\", IntegerType(), nullable = True),\n",
" StructField(\"java_stack_trace\", StringType(), nullable = True),\n",
" StructField(\"json_dump\", StructType([\n",
" StructField(\"crash_info\", StructType([\n",
" StructField(\"address\", StringType(), nullable = True),\n",
" StructField(\"crashing_thread\", IntegerType(), nullable = True),\n",
" StructField(\"type\", StringType(), nullable = True)\n",
" ]), nullable = True),\n",
" StructField(\"crashing_thread\", StructType([\n",
" StructField(\"frames\", ArrayType(StructType([\n",
" StructField(\"file\", StringType(), nullable = True),\n",
" StructField(\"frame\", IntegerType(), nullable = True),\n",
" StructField(\"function\", StringType(), nullable = True),\n",
" StructField(\"function_offset\", StringType(), nullable = True),\n",
" StructField(\"line\", IntegerType(), nullable = True),\n",
" StructField(\"module\", StringType(), nullable = True),\n",
" StructField(\"module_offset\", StringType(), nullable = True),\n",
" StructField(\"offset\", StringType(), nullable = True)\n",
" ])), nullable = True),\n",
" StructField(\"threads_index\", IntegerType(), nullable = True),\n",
" StructField(\"total_frames\", IntegerType(), nullable = True)\n",
" ]), nullable = True),\n",
" StructField(\"largest_free_vm_block\", StringType(), nullable = True),\n",
" StructField(\"main_module\", IntegerType(), nullable = True),\n",
" StructField(\"status\", StringType(), nullable = True),\n",
" StructField(\"system_info\", StructType([\n",
" StructField(\"cpu_arch\", StringType(), nullable = True),\n",
" StructField(\"cpu_count\", IntegerType(), nullable = True),\n",
" StructField(\"cpu_info\", StringType(), nullable = True),\n",
" StructField(\"os\", StringType(), nullable = True),\n",
" StructField(\"os_ver\", StringType(), nullable = True)\n",
" ]), nullable = True),\n",
" StructField(\"thread_count\", IntegerType(), nullable = True),\n",
" StructField(\"threads\", ArrayType(StructType([\n",
" StructField(\"frame_count\", IntegerType(), nullable = True),\n",
" StructField(\"frames\", ArrayType(StructType([\n",
" StructField(\"file\", StringType(), nullable = True),\n",
" StructField(\"frame\", IntegerType(), nullable = True),\n",
" StructField(\"function\", StringType(), nullable = True),\n",
" StructField(\"function_offset\", StringType(), nullable = True),\n",
" StructField(\"line\", IntegerType(), nullable = True),\n",
" StructField(\"module\", StringType(), nullable = True),\n",
" StructField(\"module_offset\", StringType(), nullable = True),\n",
" StructField(\"offset\", StringType(), nullable = True)\n",
" ])), nullable = True)\n",
" ])), nullable = True),\n",
" StructField(\"tiny_block_size\", StringType(), nullable = True),\n",
" StructField(\"write_combine_size\", StringType(), nullable = True)\n",
" ]), nullable = True),\n",
" StructField(\"last_crash\", IntegerType(), nullable = True),\n",
" StructField(\"memory_report\", StructType([\n",
" StructField(\"hasMozMallocUsableSize\", BooleanType(), nullable = False),\n",
" StructField(\"reports\", ArrayType(StructType([\n",
" StructField(\"amount\", IntegerType(), nullable = True),\n",
" StructField(\"description\", StringType(), nullable = True),\n",
" StructField(\"kind\", IntegerType(), nullable = True),\n",
" StructField(\"path\", StringType(), nullable = True),\n",
" StructField(\"process\", StringType(), nullable = True),\n",
" StructField(\"units\", IntegerType(), nullable = True)\n",
" ])), nullable = True),\n",
" StructField(\"version\", IntegerType(), nullable = True)\n",
" ]), nullable = True),\n",
" StructField(\"platform\", StringType(), nullable = True),\n",
" StructField(\"platform_pretty_version\", StringType(), nullable = True),\n",
" StructField(\"platform_version\", StringType(), nullable = True),\n",
" StructField(\"plugin_filename\", StringType(), nullable = True),\n",
" StructField(\"plugin_name\", StringType(), nullable = True),\n",
" StructField(\"plugin_version\", StringType(), nullable = True),\n",
" StructField(\"process_type\", StringType(), nullable = True),\n",
" StructField(\"processor_notes\", StringType(), nullable = True),\n",
" StructField(\"product\", StringType(), nullable = True),\n",
" StructField(\"productid\", StringType(), nullable = True),\n",
" StructField(\"proto_signature\", StringType(), nullable = True),\n",
" StructField(\"reason\", StringType(), nullable = True),\n",
" StructField(\"release_channel\", StringType(), nullable = True),\n",
" StructField(\"signature\", StringType(), nullable = True),\n",
" StructField(\"topmost_filenames\", StringType(), nullable = True),\n",
" StructField(\"uptime\", IntegerType(), nullable = True),\n",
" StructField(\"user_comments\", StringType(), nullable = True),\n",
" StructField(\"uuid\", StringType(), nullable = True),\n",
" StructField(\"version\", StringType(), nullable = True),\n",
" StructField(\"winsock_lsp\", StringType(), nullable = True)\n",
"])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Read crash data as json, convert it to parquet"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from datetime import datetime as dt, timedelta, date\n",
"\n",
"def daterange(start_date, end_date):\n",
" for n in range(int((end_date - start_date).days) + 1):\n",
" yield (end_date - timedelta(n)).strftime(\"%Y%m%d\")\n",
"\n",
"def import_day(d):\n",
" source_s3path = \"s3://org-allizom-telemetry-crashes/v1/crash_report\"\n",
" dest_s3path = \"s3://net-mozaws-prod-us-west-2-pipeline-analysis/mreid/crash/v4\"\n",
" num_partitions = 5\n",
" print \"Processing {}, started at {}\".format(d, dt.utcnow())\n",
" cur_source_s3path = \"{}/{}\".format(source_s3path, d)\n",
" cur_dest_s3path = \"{}/crash_date={}\".format(dest_s3path, d)\n",
" df = sqlContext.read.json(cur_source_s3path, schema=crash_schema)\n",
" df.repartition(num_partitions).write.parquet(cur_dest_s3path, mode=\"overwrite\")\n",
"\n",
"def backfill(start_date_yyyymmdd):\n",
" start_date = dt.strptime(start_date_yyyymmdd, \"%Y%m%d\")\n",
" end_date = dt.utcnow() - timedelta(1) # yesterday\n",
" for d in daterange(start_date, end_date):\n",
" try:\n",
" import_day(d)\n",
" except Exception as e:\n",
" print \"ERRROR: \", e"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Processing 20160908, started at 2016-09-09 18:47:42.180961\n"
]
}
],
"source": [
"yesterday = dt.strftime(dt.utcnow() - timedelta(1), \"%Y%m%d\")\n",
"import_day(yesterday)\n",
"\n",
"#backfill(\"20160902\")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.11"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
# coding: utf-8
# # Import Socorro crash data into the Data Platform
#
# We want to be able to store Socorro crash data in Parquet form so that it can be made accessible from re:dash.
#
# See [Bug 1273657](https://bugzilla.mozilla.org/show_bug.cgi?id=1273657) for more details
# In[2]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
crash_schema = StructType([
StructField("additional_minidumps", ArrayType(StringType(), containsNull = False), nullable = True),
StructField("addons", ArrayType(StringType(), containsNull = False), nullable = True),
StructField("addons_checked", BooleanType(), nullable = False),
StructField("address", StringType(), nullable = True),
StructField("app_notes", StringType(), nullable = True),
StructField("build_id", StringType(), nullable = True),
StructField("classifications", StructType([
StructField("jit", StructType([
StructField("category", StringType(), nullable = True),
StructField("category_return_code", StringType(), nullable = True)
]), nullable = True),
StructField("skunk_works", StructType([
StructField("classification", StringType(), nullable = True),
StructField("classification_data", StringType(), nullable = True),
StructField("classification_version", StringType(), nullable = True)
]), nullable = True),
StructField("support", StructType([
StructField("classification", StringType(), nullable = True),
StructField("classification_data", StringType(), nullable = True),
StructField("classification_version", StringType(), nullable = True)
]), nullable = True)
]), nullable = True),
StructField("cpu_arch", StringType(), nullable = True),
StructField("cpu_info", StringType(), nullable = True),
StructField("crash_id", StringType(), nullable = True),
StructField("date", StringType(), nullable = True),
StructField("flash_version", StringType(), nullable = True),
StructField("hang_type", IntegerType(), nullable = True),
StructField("install_age", IntegerType(), nullable = True),
StructField("java_stack_trace", StringType(), nullable = True),
StructField("json_dump", StructType([
StructField("crash_info", StructType([
StructField("address", StringType(), nullable = True),
StructField("crashing_thread", IntegerType(), nullable = True),
StructField("type", StringType(), nullable = True)
]), nullable = True),
StructField("crashing_thread", StructType([
StructField("frames", ArrayType(StructType([
StructField("file", StringType(), nullable = True),
StructField("frame", IntegerType(), nullable = True),
StructField("function", StringType(), nullable = True),
StructField("function_offset", StringType(), nullable = True),
StructField("line", IntegerType(), nullable = True),
StructField("module", StringType(), nullable = True),
StructField("module_offset", StringType(), nullable = True),
StructField("offset", StringType(), nullable = True)
])), nullable = True),
StructField("threads_index", IntegerType(), nullable = True),
StructField("total_frames", IntegerType(), nullable = True)
]), nullable = True),
StructField("largest_free_vm_block", StringType(), nullable = True),
StructField("main_module", IntegerType(), nullable = True),
StructField("status", StringType(), nullable = True),
StructField("system_info", StructType([
StructField("cpu_arch", StringType(), nullable = True),
StructField("cpu_count", IntegerType(), nullable = True),
StructField("cpu_info", StringType(), nullable = True),
StructField("os", StringType(), nullable = True),
StructField("os_ver", StringType(), nullable = True)
]), nullable = True),
StructField("thread_count", IntegerType(), nullable = True),
StructField("threads", ArrayType(StructType([
StructField("frame_count", IntegerType(), nullable = True),
StructField("frames", ArrayType(StructType([
StructField("file", StringType(), nullable = True),
StructField("frame", IntegerType(), nullable = True),
StructField("function", StringType(), nullable = True),
StructField("function_offset", StringType(), nullable = True),
StructField("line", IntegerType(), nullable = True),
StructField("module", StringType(), nullable = True),
StructField("module_offset", StringType(), nullable = True),
StructField("offset", StringType(), nullable = True)
])), nullable = True)
])), nullable = True),
StructField("tiny_block_size", StringType(), nullable = True),
StructField("write_combine_size", StringType(), nullable = True)
]), nullable = True),
StructField("last_crash", IntegerType(), nullable = True),
StructField("memory_report", StructType([
StructField("hasMozMallocUsableSize", BooleanType(), nullable = False),
StructField("reports", ArrayType(StructType([
StructField("amount", IntegerType(), nullable = True),
StructField("description", StringType(), nullable = True),
StructField("kind", IntegerType(), nullable = True),
StructField("path", StringType(), nullable = True),
StructField("process", StringType(), nullable = True),
StructField("units", IntegerType(), nullable = True)
])), nullable = True),
StructField("version", IntegerType(), nullable = True)
]), nullable = True),
StructField("platform", StringType(), nullable = True),
StructField("platform_pretty_version", StringType(), nullable = True),
StructField("platform_version", StringType(), nullable = True),
StructField("plugin_filename", StringType(), nullable = True),
StructField("plugin_name", StringType(), nullable = True),
StructField("plugin_version", StringType(), nullable = True),
StructField("process_type", StringType(), nullable = True),
StructField("processor_notes", StringType(), nullable = True),
StructField("product", StringType(), nullable = True),
StructField("productid", StringType(), nullable = True),
StructField("proto_signature", StringType(), nullable = True),
StructField("reason", StringType(), nullable = True),
StructField("release_channel", StringType(), nullable = True),
StructField("signature", StringType(), nullable = True),
StructField("topmost_filenames", StringType(), nullable = True),
StructField("uptime", IntegerType(), nullable = True),
StructField("user_comments", StringType(), nullable = True),
StructField("uuid", StringType(), nullable = True),
StructField("version", StringType(), nullable = True),
StructField("winsock_lsp", StringType(), nullable = True)
])
# ### Read crash data as json, convert it to parquet
# In[3]:
from datetime import datetime as dt, timedelta, date
def daterange(start_date, end_date):
for n in range(int((end_date - start_date).days) + 1):
yield (end_date - timedelta(n)).strftime("%Y%m%d")
def import_day(d):
source_s3path = "s3://org-allizom-telemetry-crashes/v1/crash_report"
dest_s3path = "s3://net-mozaws-prod-us-west-2-pipeline-analysis/mreid/crash/v4"
num_partitions = 5
print "Processing {}, started at {}".format(d, dt.utcnow())
cur_source_s3path = "{}/{}".format(source_s3path, d)
cur_dest_s3path = "{}/crash_date={}".format(dest_s3path, d)
df = sqlContext.read.json(cur_source_s3path, schema=crash_schema)
df.repartition(num_partitions).write.parquet(cur_dest_s3path, mode="overwrite")
def backfill(start_date_yyyymmdd):
start_date = dt.strptime(start_date_yyyymmdd, "%Y%m%d")
end_date = dt.utcnow() - timedelta(1) # yesterday
for d in daterange(start_date, end_date):
try:
import_day(d)
except Exception as e:
print "ERRROR: ", e
# In[5]:
yesterday = dt.strftime(dt.utcnow() - timedelta(1), "%Y%m%d")
import_day(yesterday)
#backfill("20160902")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment