-
-
Save mreid-moz/31ac995e3180c156db61e5f1c0ee745b to your computer and use it in GitHub Desktop.
CrashInvestigations
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
# # Process Socorro crash data with Spark | |
# | |
# We want to be able to store Socorro crash data in Parquet form so that it can be made accessible from re:dash. | |
# | |
# This is just a test per [Bug 1299183](https://bugzilla.mozilla.org/show_bug.cgi?id=1299183) to ensure that the data can be read. So far so good! | |
# In[10]: | |
from pyspark.sql import SQLContext | |
from pyspark.sql.types import * | |
crash_schema = StructType([ | |
StructField("additional_minidumps", ArrayType(StringType(), containsNull = False), nullable = True), | |
StructField("addons", ArrayType(StringType(), containsNull = False), nullable = True), | |
StructField("addons_checked", BooleanType(), nullable = False), | |
StructField("address", StringType(), nullable = True), | |
StructField("app_notes", StringType(), nullable = True), | |
StructField("build_id", StringType(), nullable = True), | |
StructField("classifications", StructType([ | |
StructField("jit", StructType([ | |
StructField("category", StringType(), nullable = True), | |
StructField("category_return_code", StringType(), nullable = True) | |
]), nullable = True), | |
StructField("skunk_works", StructType([ | |
StructField("classification", StringType(), nullable = True), | |
StructField("classification_data", StringType(), nullable = True), | |
StructField("classification_version", StringType(), nullable = True) | |
]), nullable = True), | |
StructField("support", StructType([ | |
StructField("classification", StringType(), nullable = True), | |
StructField("classification_data", StringType(), nullable = True), | |
StructField("classification_version", StringType(), nullable = True) | |
]), nullable = True) | |
]), nullable = True), | |
StructField("cpu_arch", StringType(), nullable = True), | |
StructField("cpu_info", StringType(), nullable = True), | |
StructField("crash_id", StringType(), nullable = True), | |
StructField("date", StringType(), nullable = True), | |
StructField("flash_version", StringType(), nullable = True), | |
StructField("hang_type", IntegerType(), nullable = True), | |
StructField("install_age", IntegerType(), nullable = True), | |
StructField("java_stack_trace", StringType(), nullable = True), | |
StructField("json_dump", StructType([ | |
StructField("crash_info", StructType([ | |
StructField("address", StringType(), nullable = True), | |
StructField("crashing_thread", IntegerType(), nullable = True), | |
StructField("type", StringType(), nullable = True) | |
]), nullable = True), | |
StructField("crashing_thread", StructType([ | |
StructField("frames", ArrayType(StructType([ | |
StructField("file", StringType(), nullable = True), | |
StructField("frame", IntegerType(), nullable = True), | |
StructField("function", StringType(), nullable = True), | |
StructField("function_offset", StringType(), nullable = True), | |
StructField("line", IntegerType(), nullable = True), | |
StructField("module", StringType(), nullable = True), | |
StructField("module_offset", StringType(), nullable = True), | |
StructField("offset", StringType(), nullable = True) | |
])), nullable = True), | |
StructField("threads_index", IntegerType(), nullable = True), | |
StructField("total_frames", IntegerType(), nullable = True) | |
]), nullable = True), | |
StructField("largest_free_vm_block", StringType(), nullable = True), | |
StructField("main_module", IntegerType(), nullable = True), | |
StructField("status", StringType(), nullable = True), | |
StructField("system_info", StructType([ | |
StructField("cpu_arch", StringType(), nullable = True), | |
StructField("cpu_count", IntegerType(), nullable = True), | |
StructField("cpu_info", StringType(), nullable = True), | |
StructField("os", StringType(), nullable = True), | |
StructField("os_ver", StringType(), nullable = True) | |
]), nullable = True), | |
StructField("thread_count", IntegerType(), nullable = True), | |
StructField("threads", ArrayType(StructType([ | |
StructField("frame_count", IntegerType(), nullable = True), | |
StructField("frames", ArrayType(StructType([ | |
StructField("file", StringType(), nullable = True), | |
StructField("frame", IntegerType(), nullable = True), | |
StructField("function", StringType(), nullable = True), | |
StructField("function_offset", StringType(), nullable = True), | |
StructField("line", IntegerType(), nullable = True), | |
StructField("module", StringType(), nullable = True), | |
StructField("module_offset", StringType(), nullable = True), | |
StructField("offset", StringType(), nullable = True) | |
])), nullable = True) | |
])), nullable = True), | |
StructField("tiny_block_size", StringType(), nullable = True), | |
StructField("write_combine_size", StringType(), nullable = True) | |
]), nullable = True), | |
StructField("last_crash", IntegerType(), nullable = True), | |
StructField("memory_report", StructType([ | |
StructField("hasMozMallocUsableSize", BooleanType(), nullable = False), | |
StructField("reports", ArrayType(StructType([ | |
StructField("amount", IntegerType(), nullable = True), | |
StructField("description", StringType(), nullable = True), | |
StructField("kind", IntegerType(), nullable = True), | |
StructField("path", StringType(), nullable = True), | |
StructField("process", StringType(), nullable = True), | |
StructField("units", IntegerType(), nullable = True) | |
])), nullable = True), | |
StructField("version", IntegerType(), nullable = True) | |
]), nullable = True), | |
StructField("platform", StringType(), nullable = True), | |
StructField("platform_pretty_version", StringType(), nullable = True), | |
StructField("platform_version", StringType(), nullable = True), | |
StructField("plugin_filename", StringType(), nullable = True), | |
StructField("plugin_name", StringType(), nullable = True), | |
StructField("plugin_version", StringType(), nullable = True), | |
StructField("process_type", StringType(), nullable = True), | |
StructField("processor_notes", StringType(), nullable = True), | |
StructField("product", StringType(), nullable = True), | |
StructField("productid", StringType(), nullable = True), | |
StructField("proto_signature", StringType(), nullable = True), | |
StructField("reason", StringType(), nullable = True), | |
StructField("release_channel", StringType(), nullable = True), | |
StructField("signature", StringType(), nullable = True), | |
StructField("topmost_filenames", StringType(), nullable = True), | |
StructField("uptime", IntegerType(), nullable = True), | |
StructField("user_comments", StringType(), nullable = True), | |
StructField("uuid", StringType(), nullable = True), | |
StructField("version", StringType(), nullable = True), | |
StructField("winsock_lsp", StringType(), nullable = True) | |
]) | |
# In[11]: | |
bucket = "org-allizom-telemetry-crashes" | |
prefix = "v1/crash_report/20160830/" | |
s3path = "s3://{}/{}".format(bucket, prefix) | |
# Note that we pass an explicit schema to use for the data, rather than allowing Spark to automatically infer it. | |
# More details at http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.json | |
get_ipython().magic(u'time dataset = sqlContext.read.json(s3path, schema=crash_schema)') | |
# And what does the resulting schema look like? | |
# In[12]: | |
dataset.printSchema() | |
# In[13]: | |
get_ipython().magic(u'time dataset.count()') | |
# In[14]: | |
sqlContext.registerDataFrameAsTable(dataset, "crashes") | |
# In[15]: | |
by_version = sqlContext.sql("select version, count(*) as count from crashes group by version order by count(*) desc") | |
# In[16]: | |
by_version.collect() | |
# ### Store data as parquet | |
# | |
# Now that we have a DataFrame, it is trivial to save it in parquet form. | |
# In[ ]: | |
dataset.repartition(10).write.parquet("s3://net-mozaws-prod-us-west-2-pipeline-analysis/mreid/crash/v3") | |
# ### Backfill some more data | |
# In[22]: | |
from datetime import datetime as dt, timedelta, date | |
source_s3path = "s3://org-allizom-telemetry-crashes/v1/crash_report" | |
dest_s3path = "s3://net-mozaws-prod-us-west-2-pipeline-analysis/mreid/crash/v4" | |
num_partitions = 5 | |
start_date = dt.strptime("20160818", "%Y%m%d") | |
end_date = dt.utcnow() - timedelta(1) # yesterday | |
def daterange(start_date, end_date): | |
for n in range(int((end_date - start_date).days) + 1): | |
yield (end_date - timedelta(n)).strftime("%Y%m%d") | |
for d in daterange(start_date, end_date): | |
try: | |
print "Processing {}, started at {}".format(d, dt.utcnow()) | |
cur_source_s3path = "{}/{}".format(source_s3path, d) | |
cur_dest_s3path = "{}/crash_date={}".format(dest_s3path, d) | |
df = sqlContext.read.json(cur_source_s3path, schema=crash_schema) | |
df.repartition(num_partitions).write.parquet(cur_dest_s3path, mode="overwrite") | |
except Exception as e: | |
print "ERRROR: ", e | |
# In[ ]: | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment