-
-
Save mreid-moz/092029949782249577aee92602879e2b to your computer and use it in GitHub Desktop.
ImportCrashData
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Import Socorro crash data into the Data Platform\n", | |
"\n", | |
"We want to be able to store Socorro crash data in Parquet form so that it can be made accessible from re:dash.\n", | |
"\n", | |
"See [Bug 1273657](https://bugzilla.mozilla.org/show_bug.cgi?id=1273657) for more details" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"from pyspark.sql import SQLContext\n", | |
"from pyspark.sql.types import *\n", | |
"\n", | |
"crash_schema = StructType([\n", | |
" StructField(\"additional_minidumps\", ArrayType(StringType(), containsNull = False), nullable = True),\n", | |
" StructField(\"addons\", ArrayType(StringType(), containsNull = False), nullable = True),\n", | |
" StructField(\"addons_checked\", BooleanType(), nullable = False),\n", | |
" StructField(\"address\", StringType(), nullable = True),\n", | |
" StructField(\"app_notes\", StringType(), nullable = True),\n", | |
" StructField(\"build_id\", StringType(), nullable = True),\n", | |
" StructField(\"classifications\", StructType([\n", | |
" StructField(\"jit\", StructType([\n", | |
" StructField(\"category\", StringType(), nullable = True),\n", | |
" StructField(\"category_return_code\", StringType(), nullable = True)\n", | |
" ]), nullable = True),\n", | |
" StructField(\"skunk_works\", StructType([\n", | |
" StructField(\"classification\", StringType(), nullable = True),\n", | |
" StructField(\"classification_data\", StringType(), nullable = True),\n", | |
" StructField(\"classification_version\", StringType(), nullable = True)\n", | |
" ]), nullable = True),\n", | |
" StructField(\"support\", StructType([\n", | |
" StructField(\"classification\", StringType(), nullable = True),\n", | |
" StructField(\"classification_data\", StringType(), nullable = True),\n", | |
" StructField(\"classification_version\", StringType(), nullable = True)\n", | |
" ]), nullable = True)\n", | |
" ]), nullable = True),\n", | |
" StructField(\"cpu_arch\", StringType(), nullable = True),\n", | |
" StructField(\"cpu_info\", StringType(), nullable = True),\n", | |
" StructField(\"crash_id\", StringType(), nullable = True),\n", | |
" StructField(\"date\", StringType(), nullable = True),\n", | |
" StructField(\"flash_version\", StringType(), nullable = True),\n", | |
" StructField(\"hang_type\", IntegerType(), nullable = True),\n", | |
" StructField(\"install_age\", IntegerType(), nullable = True),\n", | |
" StructField(\"java_stack_trace\", StringType(), nullable = True),\n", | |
" StructField(\"json_dump\", StructType([\n", | |
" StructField(\"crash_info\", StructType([\n", | |
" StructField(\"address\", StringType(), nullable = True),\n", | |
" StructField(\"crashing_thread\", IntegerType(), nullable = True),\n", | |
" StructField(\"type\", StringType(), nullable = True)\n", | |
" ]), nullable = True),\n", | |
" StructField(\"crashing_thread\", StructType([\n", | |
" StructField(\"frames\", ArrayType(StructType([\n", | |
" StructField(\"file\", StringType(), nullable = True),\n", | |
" StructField(\"frame\", IntegerType(), nullable = True),\n", | |
" StructField(\"function\", StringType(), nullable = True),\n", | |
" StructField(\"function_offset\", StringType(), nullable = True),\n", | |
" StructField(\"line\", IntegerType(), nullable = True),\n", | |
" StructField(\"module\", StringType(), nullable = True),\n", | |
" StructField(\"module_offset\", StringType(), nullable = True),\n", | |
" StructField(\"offset\", StringType(), nullable = True)\n", | |
" ])), nullable = True),\n", | |
" StructField(\"threads_index\", IntegerType(), nullable = True),\n", | |
" StructField(\"total_frames\", IntegerType(), nullable = True)\n", | |
" ]), nullable = True),\n", | |
" StructField(\"largest_free_vm_block\", StringType(), nullable = True),\n", | |
" StructField(\"main_module\", IntegerType(), nullable = True),\n", | |
" StructField(\"status\", StringType(), nullable = True),\n", | |
" StructField(\"system_info\", StructType([\n", | |
" StructField(\"cpu_arch\", StringType(), nullable = True),\n", | |
" StructField(\"cpu_count\", IntegerType(), nullable = True),\n", | |
" StructField(\"cpu_info\", StringType(), nullable = True),\n", | |
" StructField(\"os\", StringType(), nullable = True),\n", | |
" StructField(\"os_ver\", StringType(), nullable = True)\n", | |
" ]), nullable = True),\n", | |
" StructField(\"thread_count\", IntegerType(), nullable = True),\n", | |
" StructField(\"threads\", ArrayType(StructType([\n", | |
" StructField(\"frame_count\", IntegerType(), nullable = True),\n", | |
" StructField(\"frames\", ArrayType(StructType([\n", | |
" StructField(\"file\", StringType(), nullable = True),\n", | |
" StructField(\"frame\", IntegerType(), nullable = True),\n", | |
" StructField(\"function\", StringType(), nullable = True),\n", | |
" StructField(\"function_offset\", StringType(), nullable = True),\n", | |
" StructField(\"line\", IntegerType(), nullable = True),\n", | |
" StructField(\"module\", StringType(), nullable = True),\n", | |
" StructField(\"module_offset\", StringType(), nullable = True),\n", | |
" StructField(\"offset\", StringType(), nullable = True)\n", | |
" ])), nullable = True)\n", | |
" ])), nullable = True),\n", | |
" StructField(\"tiny_block_size\", StringType(), nullable = True),\n", | |
" StructField(\"write_combine_size\", StringType(), nullable = True)\n", | |
" ]), nullable = True),\n", | |
" StructField(\"last_crash\", IntegerType(), nullable = True),\n", | |
" StructField(\"memory_report\", StructType([\n", | |
" StructField(\"hasMozMallocUsableSize\", BooleanType(), nullable = False),\n", | |
" StructField(\"reports\", ArrayType(StructType([\n", | |
" StructField(\"amount\", IntegerType(), nullable = True),\n", | |
" StructField(\"description\", StringType(), nullable = True),\n", | |
" StructField(\"kind\", IntegerType(), nullable = True),\n", | |
" StructField(\"path\", StringType(), nullable = True),\n", | |
" StructField(\"process\", StringType(), nullable = True),\n", | |
" StructField(\"units\", IntegerType(), nullable = True)\n", | |
" ])), nullable = True),\n", | |
" StructField(\"version\", IntegerType(), nullable = True)\n", | |
" ]), nullable = True),\n", | |
" StructField(\"platform\", StringType(), nullable = True),\n", | |
" StructField(\"platform_pretty_version\", StringType(), nullable = True),\n", | |
" StructField(\"platform_version\", StringType(), nullable = True),\n", | |
" StructField(\"plugin_filename\", StringType(), nullable = True),\n", | |
" StructField(\"plugin_name\", StringType(), nullable = True),\n", | |
" StructField(\"plugin_version\", StringType(), nullable = True),\n", | |
" StructField(\"process_type\", StringType(), nullable = True),\n", | |
" StructField(\"processor_notes\", StringType(), nullable = True),\n", | |
" StructField(\"product\", StringType(), nullable = True),\n", | |
" StructField(\"productid\", StringType(), nullable = True),\n", | |
" StructField(\"proto_signature\", StringType(), nullable = True),\n", | |
" StructField(\"reason\", StringType(), nullable = True),\n", | |
" StructField(\"release_channel\", StringType(), nullable = True),\n", | |
" StructField(\"signature\", StringType(), nullable = True),\n", | |
" StructField(\"topmost_filenames\", StringType(), nullable = True),\n", | |
" StructField(\"uptime\", IntegerType(), nullable = True),\n", | |
" StructField(\"user_comments\", StringType(), nullable = True),\n", | |
" StructField(\"uuid\", StringType(), nullable = True),\n", | |
" StructField(\"version\", StringType(), nullable = True),\n", | |
" StructField(\"winsock_lsp\", StringType(), nullable = True)\n", | |
"])" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"### Read crash data as json, convert it to parquet" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"from datetime import datetime as dt, timedelta, date\n", | |
"\n", | |
"def daterange(start_date, end_date):\n", | |
" for n in range(int((end_date - start_date).days) + 1):\n", | |
" yield (end_date - timedelta(n)).strftime(\"%Y%m%d\")\n", | |
"\n", | |
"def import_day(d):\n", | |
" source_s3path = \"s3://org-allizom-telemetry-crashes/v1/crash_report\"\n", | |
" dest_s3path = \"s3://net-mozaws-prod-us-west-2-pipeline-analysis/mreid/crash/v4\"\n", | |
" num_partitions = 5\n", | |
" print \"Processing {}, started at {}\".format(d, dt.utcnow())\n", | |
" cur_source_s3path = \"{}/{}\".format(source_s3path, d)\n", | |
" cur_dest_s3path = \"{}/crash_date={}\".format(dest_s3path, d)\n", | |
" df = sqlContext.read.json(cur_source_s3path, schema=crash_schema)\n", | |
" df.repartition(num_partitions).write.parquet(cur_dest_s3path, mode=\"overwrite\")\n", | |
"\n", | |
"def backfill(start_date_yyyymmdd):\n", | |
" start_date = dt.strptime(start_date_yyyymmdd, \"%Y%m%d\")\n", | |
" end_date = dt.utcnow() - timedelta(1) # yesterday\n", | |
" for d in daterange(start_date, end_date):\n", | |
" try:\n", | |
" import_day(d)\n", | |
" except Exception as e:\n", | |
" print \"ERRROR: \", e" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 5, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"Processing 20160908, started at 2016-09-09 18:47:42.180961\n" | |
] | |
} | |
], | |
"source": [ | |
"yesterday = dt.strftime(dt.utcnow() - timedelta(1), \"%Y%m%d\")\n", | |
"import_day(yesterday)\n", | |
"\n", | |
"#backfill(\"20160902\")" | |
] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python 2", | |
"language": "python", | |
"name": "python2" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 2 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython2", | |
"version": "2.7.11" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 0 | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
# # Import Socorro crash data into the Data Platform | |
# | |
# We want to be able to store Socorro crash data in Parquet form so that it can be made accessible from re:dash. | |
# | |
# See [Bug 1273657](https://bugzilla.mozilla.org/show_bug.cgi?id=1273657) for more details | |
# In[2]: | |
from pyspark.sql import SQLContext | |
from pyspark.sql.types import * | |
crash_schema = StructType([ | |
StructField("additional_minidumps", ArrayType(StringType(), containsNull = False), nullable = True), | |
StructField("addons", ArrayType(StringType(), containsNull = False), nullable = True), | |
StructField("addons_checked", BooleanType(), nullable = False), | |
StructField("address", StringType(), nullable = True), | |
StructField("app_notes", StringType(), nullable = True), | |
StructField("build_id", StringType(), nullable = True), | |
StructField("classifications", StructType([ | |
StructField("jit", StructType([ | |
StructField("category", StringType(), nullable = True), | |
StructField("category_return_code", StringType(), nullable = True) | |
]), nullable = True), | |
StructField("skunk_works", StructType([ | |
StructField("classification", StringType(), nullable = True), | |
StructField("classification_data", StringType(), nullable = True), | |
StructField("classification_version", StringType(), nullable = True) | |
]), nullable = True), | |
StructField("support", StructType([ | |
StructField("classification", StringType(), nullable = True), | |
StructField("classification_data", StringType(), nullable = True), | |
StructField("classification_version", StringType(), nullable = True) | |
]), nullable = True) | |
]), nullable = True), | |
StructField("cpu_arch", StringType(), nullable = True), | |
StructField("cpu_info", StringType(), nullable = True), | |
StructField("crash_id", StringType(), nullable = True), | |
StructField("date", StringType(), nullable = True), | |
StructField("flash_version", StringType(), nullable = True), | |
StructField("hang_type", IntegerType(), nullable = True), | |
StructField("install_age", IntegerType(), nullable = True), | |
StructField("java_stack_trace", StringType(), nullable = True), | |
StructField("json_dump", StructType([ | |
StructField("crash_info", StructType([ | |
StructField("address", StringType(), nullable = True), | |
StructField("crashing_thread", IntegerType(), nullable = True), | |
StructField("type", StringType(), nullable = True) | |
]), nullable = True), | |
StructField("crashing_thread", StructType([ | |
StructField("frames", ArrayType(StructType([ | |
StructField("file", StringType(), nullable = True), | |
StructField("frame", IntegerType(), nullable = True), | |
StructField("function", StringType(), nullable = True), | |
StructField("function_offset", StringType(), nullable = True), | |
StructField("line", IntegerType(), nullable = True), | |
StructField("module", StringType(), nullable = True), | |
StructField("module_offset", StringType(), nullable = True), | |
StructField("offset", StringType(), nullable = True) | |
])), nullable = True), | |
StructField("threads_index", IntegerType(), nullable = True), | |
StructField("total_frames", IntegerType(), nullable = True) | |
]), nullable = True), | |
StructField("largest_free_vm_block", StringType(), nullable = True), | |
StructField("main_module", IntegerType(), nullable = True), | |
StructField("status", StringType(), nullable = True), | |
StructField("system_info", StructType([ | |
StructField("cpu_arch", StringType(), nullable = True), | |
StructField("cpu_count", IntegerType(), nullable = True), | |
StructField("cpu_info", StringType(), nullable = True), | |
StructField("os", StringType(), nullable = True), | |
StructField("os_ver", StringType(), nullable = True) | |
]), nullable = True), | |
StructField("thread_count", IntegerType(), nullable = True), | |
StructField("threads", ArrayType(StructType([ | |
StructField("frame_count", IntegerType(), nullable = True), | |
StructField("frames", ArrayType(StructType([ | |
StructField("file", StringType(), nullable = True), | |
StructField("frame", IntegerType(), nullable = True), | |
StructField("function", StringType(), nullable = True), | |
StructField("function_offset", StringType(), nullable = True), | |
StructField("line", IntegerType(), nullable = True), | |
StructField("module", StringType(), nullable = True), | |
StructField("module_offset", StringType(), nullable = True), | |
StructField("offset", StringType(), nullable = True) | |
])), nullable = True) | |
])), nullable = True), | |
StructField("tiny_block_size", StringType(), nullable = True), | |
StructField("write_combine_size", StringType(), nullable = True) | |
]), nullable = True), | |
StructField("last_crash", IntegerType(), nullable = True), | |
StructField("memory_report", StructType([ | |
StructField("hasMozMallocUsableSize", BooleanType(), nullable = False), | |
StructField("reports", ArrayType(StructType([ | |
StructField("amount", IntegerType(), nullable = True), | |
StructField("description", StringType(), nullable = True), | |
StructField("kind", IntegerType(), nullable = True), | |
StructField("path", StringType(), nullable = True), | |
StructField("process", StringType(), nullable = True), | |
StructField("units", IntegerType(), nullable = True) | |
])), nullable = True), | |
StructField("version", IntegerType(), nullable = True) | |
]), nullable = True), | |
StructField("platform", StringType(), nullable = True), | |
StructField("platform_pretty_version", StringType(), nullable = True), | |
StructField("platform_version", StringType(), nullable = True), | |
StructField("plugin_filename", StringType(), nullable = True), | |
StructField("plugin_name", StringType(), nullable = True), | |
StructField("plugin_version", StringType(), nullable = True), | |
StructField("process_type", StringType(), nullable = True), | |
StructField("processor_notes", StringType(), nullable = True), | |
StructField("product", StringType(), nullable = True), | |
StructField("productid", StringType(), nullable = True), | |
StructField("proto_signature", StringType(), nullable = True), | |
StructField("reason", StringType(), nullable = True), | |
StructField("release_channel", StringType(), nullable = True), | |
StructField("signature", StringType(), nullable = True), | |
StructField("topmost_filenames", StringType(), nullable = True), | |
StructField("uptime", IntegerType(), nullable = True), | |
StructField("user_comments", StringType(), nullable = True), | |
StructField("uuid", StringType(), nullable = True), | |
StructField("version", StringType(), nullable = True), | |
StructField("winsock_lsp", StringType(), nullable = True) | |
]) | |
# ### Read crash data as json, convert it to parquet | |
# In[3]: | |
from datetime import datetime as dt, timedelta, date | |
def daterange(start_date, end_date): | |
for n in range(int((end_date - start_date).days) + 1): | |
yield (end_date - timedelta(n)).strftime("%Y%m%d") | |
def import_day(d): | |
source_s3path = "s3://org-allizom-telemetry-crashes/v1/crash_report" | |
dest_s3path = "s3://net-mozaws-prod-us-west-2-pipeline-analysis/mreid/crash/v4" | |
num_partitions = 5 | |
print "Processing {}, started at {}".format(d, dt.utcnow()) | |
cur_source_s3path = "{}/{}".format(source_s3path, d) | |
cur_dest_s3path = "{}/crash_date={}".format(dest_s3path, d) | |
df = sqlContext.read.json(cur_source_s3path, schema=crash_schema) | |
df.repartition(num_partitions).write.parquet(cur_dest_s3path, mode="overwrite") | |
def backfill(start_date_yyyymmdd): | |
start_date = dt.strptime(start_date_yyyymmdd, "%Y%m%d") | |
end_date = dt.utcnow() - timedelta(1) # yesterday | |
for d in daterange(start_date, end_date): | |
try: | |
import_day(d) | |
except Exception as e: | |
print "ERRROR: ", e | |
# In[5]: | |
yesterday = dt.strftime(dt.utcnow() - timedelta(1), "%Y%m%d") | |
import_day(yesterday) | |
#backfill("20160902") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment