Skip to content

Instantly share code, notes, and snippets.

@mreid-moz
Last active September 2, 2016 17:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mreid-moz/31ac995e3180c156db61e5f1c0ee745b to your computer and use it in GitHub Desktop.
Save mreid-moz/31ac995e3180c156db61e5f1c0ee745b to your computer and use it in GitHub Desktop.
CrashInvestigations
Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
# coding: utf-8
# # Process Socorro crash data with Spark
#
# We want to be able to store Socorro crash data in Parquet form so that it can be made accessible from re:dash.
#
# This is just a test per [Bug 1299183](https://bugzilla.mozilla.org/show_bug.cgi?id=1299183) to ensure that the data can be read. So far so good!
# In[10]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
crash_schema = StructType([
StructField("additional_minidumps", ArrayType(StringType(), containsNull = False), nullable = True),
StructField("addons", ArrayType(StringType(), containsNull = False), nullable = True),
StructField("addons_checked", BooleanType(), nullable = False),
StructField("address", StringType(), nullable = True),
StructField("app_notes", StringType(), nullable = True),
StructField("build_id", StringType(), nullable = True),
StructField("classifications", StructType([
StructField("jit", StructType([
StructField("category", StringType(), nullable = True),
StructField("category_return_code", StringType(), nullable = True)
]), nullable = True),
StructField("skunk_works", StructType([
StructField("classification", StringType(), nullable = True),
StructField("classification_data", StringType(), nullable = True),
StructField("classification_version", StringType(), nullable = True)
]), nullable = True),
StructField("support", StructType([
StructField("classification", StringType(), nullable = True),
StructField("classification_data", StringType(), nullable = True),
StructField("classification_version", StringType(), nullable = True)
]), nullable = True)
]), nullable = True),
StructField("cpu_arch", StringType(), nullable = True),
StructField("cpu_info", StringType(), nullable = True),
StructField("crash_id", StringType(), nullable = True),
StructField("date", StringType(), nullable = True),
StructField("flash_version", StringType(), nullable = True),
StructField("hang_type", IntegerType(), nullable = True),
StructField("install_age", IntegerType(), nullable = True),
StructField("java_stack_trace", StringType(), nullable = True),
StructField("json_dump", StructType([
StructField("crash_info", StructType([
StructField("address", StringType(), nullable = True),
StructField("crashing_thread", IntegerType(), nullable = True),
StructField("type", StringType(), nullable = True)
]), nullable = True),
StructField("crashing_thread", StructType([
StructField("frames", ArrayType(StructType([
StructField("file", StringType(), nullable = True),
StructField("frame", IntegerType(), nullable = True),
StructField("function", StringType(), nullable = True),
StructField("function_offset", StringType(), nullable = True),
StructField("line", IntegerType(), nullable = True),
StructField("module", StringType(), nullable = True),
StructField("module_offset", StringType(), nullable = True),
StructField("offset", StringType(), nullable = True)
])), nullable = True),
StructField("threads_index", IntegerType(), nullable = True),
StructField("total_frames", IntegerType(), nullable = True)
]), nullable = True),
StructField("largest_free_vm_block", StringType(), nullable = True),
StructField("main_module", IntegerType(), nullable = True),
StructField("status", StringType(), nullable = True),
StructField("system_info", StructType([
StructField("cpu_arch", StringType(), nullable = True),
StructField("cpu_count", IntegerType(), nullable = True),
StructField("cpu_info", StringType(), nullable = True),
StructField("os", StringType(), nullable = True),
StructField("os_ver", StringType(), nullable = True)
]), nullable = True),
StructField("thread_count", IntegerType(), nullable = True),
StructField("threads", ArrayType(StructType([
StructField("frame_count", IntegerType(), nullable = True),
StructField("frames", ArrayType(StructType([
StructField("file", StringType(), nullable = True),
StructField("frame", IntegerType(), nullable = True),
StructField("function", StringType(), nullable = True),
StructField("function_offset", StringType(), nullable = True),
StructField("line", IntegerType(), nullable = True),
StructField("module", StringType(), nullable = True),
StructField("module_offset", StringType(), nullable = True),
StructField("offset", StringType(), nullable = True)
])), nullable = True)
])), nullable = True),
StructField("tiny_block_size", StringType(), nullable = True),
StructField("write_combine_size", StringType(), nullable = True)
]), nullable = True),
StructField("last_crash", IntegerType(), nullable = True),
StructField("memory_report", StructType([
StructField("hasMozMallocUsableSize", BooleanType(), nullable = False),
StructField("reports", ArrayType(StructType([
StructField("amount", IntegerType(), nullable = True),
StructField("description", StringType(), nullable = True),
StructField("kind", IntegerType(), nullable = True),
StructField("path", StringType(), nullable = True),
StructField("process", StringType(), nullable = True),
StructField("units", IntegerType(), nullable = True)
])), nullable = True),
StructField("version", IntegerType(), nullable = True)
]), nullable = True),
StructField("platform", StringType(), nullable = True),
StructField("platform_pretty_version", StringType(), nullable = True),
StructField("platform_version", StringType(), nullable = True),
StructField("plugin_filename", StringType(), nullable = True),
StructField("plugin_name", StringType(), nullable = True),
StructField("plugin_version", StringType(), nullable = True),
StructField("process_type", StringType(), nullable = True),
StructField("processor_notes", StringType(), nullable = True),
StructField("product", StringType(), nullable = True),
StructField("productid", StringType(), nullable = True),
StructField("proto_signature", StringType(), nullable = True),
StructField("reason", StringType(), nullable = True),
StructField("release_channel", StringType(), nullable = True),
StructField("signature", StringType(), nullable = True),
StructField("topmost_filenames", StringType(), nullable = True),
StructField("uptime", IntegerType(), nullable = True),
StructField("user_comments", StringType(), nullable = True),
StructField("uuid", StringType(), nullable = True),
StructField("version", StringType(), nullable = True),
StructField("winsock_lsp", StringType(), nullable = True)
])
# In[11]:
bucket = "org-allizom-telemetry-crashes"
prefix = "v1/crash_report/20160830/"
s3path = "s3://{}/{}".format(bucket, prefix)
# Note that we pass an explicit schema to use for the data, rather than allowing Spark to automatically infer it.
# More details at http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.json
get_ipython().magic(u'time dataset = sqlContext.read.json(s3path, schema=crash_schema)')
# And what does the resulting schema look like?
# In[12]:
dataset.printSchema()
# In[13]:
get_ipython().magic(u'time dataset.count()')
# In[14]:
sqlContext.registerDataFrameAsTable(dataset, "crashes")
# In[15]:
by_version = sqlContext.sql("select version, count(*) as count from crashes group by version order by count(*) desc")
# In[16]:
by_version.collect()
# ### Store data as parquet
#
# Now that we have a DataFrame, it is trivial to save it in parquet form.
# In[ ]:
dataset.repartition(10).write.parquet("s3://net-mozaws-prod-us-west-2-pipeline-analysis/mreid/crash/v3")
# ### Backfill some more data
# In[22]:
from datetime import datetime as dt, timedelta, date
source_s3path = "s3://org-allizom-telemetry-crashes/v1/crash_report"
dest_s3path = "s3://net-mozaws-prod-us-west-2-pipeline-analysis/mreid/crash/v4"
num_partitions = 5
start_date = dt.strptime("20160818", "%Y%m%d")
end_date = dt.utcnow() - timedelta(1) # yesterday
def daterange(start_date, end_date):
for n in range(int((end_date - start_date).days) + 1):
yield (end_date - timedelta(n)).strftime("%Y%m%d")
for d in daterange(start_date, end_date):
try:
print "Processing {}, started at {}".format(d, dt.utcnow())
cur_source_s3path = "{}/{}".format(source_s3path, d)
cur_dest_s3path = "{}/crash_date={}".format(dest_s3path, d)
df = sqlContext.read.json(cur_source_s3path, schema=crash_schema)
df.repartition(num_partitions).write.parquet(cur_dest_s3path, mode="overwrite")
except Exception as e:
print "ERRROR: ", e
# In[ ]:
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment