Skip to content

Instantly share code, notes, and snippets.

@acmiyaguchi
Created November 8, 2016 21:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save acmiyaguchi/80b4cd75e9783a0e1a748105d8a83fe2 to your computer and use it in GitHub Desktop.
Save acmiyaguchi/80b4cd75e9783a0e1a748105d8a83fe2 to your computer and use it in GitHub Desktop.
crash data json_dump.modules
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Import Socorro crash data into the Data Platform\n",
"\n",
"We want to be able to store Socorro crash data in Parquet form so that it can be made accessible from re:dash.\n",
"\n",
"See [Bug 1273657](https://bugzilla.mozilla.org/show_bug.cgi?id=1273657) for more details"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Fetching package metadata .......\n",
"Solving package specifications: ..........\n",
"\n",
"Package plan for installation in environment /home/hadoop/anaconda2:\n",
"\n",
"The following packages will be downloaded:\n",
"\n",
" package | build\n",
" ---------------------------|-----------------\n",
" conda-env-2.6.0 | 0 502 B\n",
" jmespath-0.9.0 | py27_0 27 KB\n",
" conda-4.2.12 | py27_0 373 KB\n",
" botocore-1.4.70 | py27_0 2.0 MB\n",
" s3transfer-0.1.9 | py27_0 64 KB\n",
" boto3-1.4.1 | py27_0 98 KB\n",
" ------------------------------------------------------------\n",
" Total: 2.5 MB\n",
"\n",
"The following NEW packages will be INSTALLED:\n",
"\n",
" boto3: 1.4.1-py27_0 \n",
" botocore: 1.4.70-py27_0\n",
" conda-env: 2.6.0-0 \n",
" jmespath: 0.9.0-py27_0 \n",
" s3transfer: 0.1.9-py27_0 \n",
"\n",
"The following packages will be UPDATED:\n",
"\n",
" conda: 4.2.9-py27_0 --> 4.2.12-py27_0\n",
"\n",
"Fetching packages ...\n",
"conda-env-2.6. 100% |################################| Time: 0:00:00 767.61 kB/s\n",
"jmespath-0.9.0 100% |################################| Time: 0:00:00 2.95 MB/s\n",
"conda-4.2.12-p 100% |################################| Time: 0:00:00 5.49 MB/s\n",
"botocore-1.4.7 100% |################################| Time: 0:00:00 3.71 MB/s\n",
"s3transfer-0.1 100% |################################| Time: 0:00:00 891.22 kB/s\n",
"boto3-1.4.1-py 100% |################################| Time: 0:00:00 725.56 kB/s\n",
"Extracting packages ...\n",
"[ COMPLETE ]|###################################################| 100%\n",
"Unlinking packages ...\n",
"[ COMPLETE ]|###################################################| 100%\n",
"Linking packages ...\n",
"[ COMPLETE ]|###################################################| 100%\n",
"dbus post-link :: /etc/machine-id not found ..\n",
"dbus post-link :: .. using /proc/sys/kernel/random/boot_id\n"
]
}
],
"source": [
"!conda install boto3 --yes"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import logging\n",
"logging.basicConfig(level=logging.INFO)\n",
"log = logging.getLogger(__name__)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We create the pyspark datatype for representing the crash data in spark. This is a slightly modified version of [peterbe/crash-report-struct-code](https://github.com/peterbe/crash-report-struct-code). "
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from pyspark.sql.types import *\n",
"\n",
"def create_struct(schema):\n",
" \"\"\" Take a JSON schema and return a pyspark StructType of equivalent structure. \"\"\"\n",
" \n",
" replace_definitions(schema, schema['definitions'])\n",
" assert '$ref' not in str(schema), 're-write didnt work'\n",
" \n",
" struct = StructType()\n",
" for row in get_rows(schema):\n",
" struct.add(row)\n",
"\n",
" return struct\n",
"\n",
"def replace_definitions(schema, definitions):\n",
" \"\"\" Replace references in the JSON schema with their definitions.\"\"\"\n",
"\n",
" if 'properties' in schema:\n",
" for prop, meta in schema['properties'].items():\n",
" replace_definitions(meta, definitions)\n",
" elif 'items' in schema:\n",
" if '$ref' in schema['items']:\n",
" ref = schema['items']['$ref'].split('/')[-1]\n",
" schema['items'] = definitions[ref]\n",
" replace_definitions(schema['items'], definitions)\n",
" else:\n",
" replace_definitions(schema['items'], definitions)\n",
" elif '$ref' in str(schema):\n",
" err_msg = \"Reference not found for schema: {}\".format(str(schema))\n",
" log.error(err_msg)\n",
" raise ValueError(err_msg)\n",
"\n",
"def get_rows(schema):\n",
" \"\"\" Map the fields in a JSON schema to corresponding data structures in pyspark.\"\"\"\n",
" \n",
" if 'properties' not in schema:\n",
" err_msg = \"Invalid JSON schema: properties field is missing.\"\n",
" log.error(err_msg)\n",
" raise ValueError(err_msg)\n",
" \n",
" for prop in sorted(schema['properties']):\n",
" meta = schema['properties'][prop]\n",
" if 'string' in meta['type']:\n",
" logging.debug(\"{!r} allows the type to be String AND Integer\".format(prop))\n",
" yield StructField(prop, StringType(), 'null' in meta['type'])\n",
" elif 'integer' in meta['type']:\n",
" yield StructField(prop, IntegerType(), 'null' in meta['type'])\n",
" elif 'boolean' in meta['type']:\n",
" yield StructField(prop, BooleanType(), 'null' in meta['type'])\n",
" elif meta['type'] == 'array' and 'items' not in meta:\n",
" # Assuming strings in the array\n",
" yield StructField(prop, ArrayType(StringType(), False), True)\n",
" elif meta['type'] == 'array' and 'items' in meta:\n",
" struct = StructType()\n",
" for row in get_rows(meta['items']):\n",
" struct.add(row)\n",
" yield StructField(prop, ArrayType(struct), True)\n",
" elif meta['type'] == 'object':\n",
" struct = StructType()\n",
" for row in get_rows(meta):\n",
" struct.add(row)\n",
" yield StructField(prop, struct, True)\n",
" else:\n",
" err_msg = \"Invalid JSON schema: {}\".format(str(meta)[:100])\n",
" log.error(err_msg)\n",
" raise ValueError(err_msg)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"First fetch from the primary source in s3 as per [bug 1312006](https://bugzilla.mozilla.org/show_bug.cgi?id=1312006). We fall back to the github location if this is not available."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:__main__:Fetching latest crash data schema from s3://org-mozilla-telemetry-crashes/crash_report.json\n",
"INFO:botocore.vendored.requests.packages.urllib3.connectionpool:Starting new HTTP connection (1): 169.254.169.254\n",
"INFO:botocore.vendored.requests.packages.urllib3.connectionpool:Starting new HTTP connection (1): 169.254.169.254\n",
"INFO:botocore.credentials:Found credentials from IAM Role: telemetry-spark-cloudformation-TelemetrySparkRole-RHL50R5U270K\n",
"INFO:botocore.vendored.requests.packages.urllib3.connectionpool:Starting new HTTPS connection (1): org-mozilla-telemetry-crashes.s3.amazonaws.com\n",
"WARNING:__main__:Could not fetch schema from s3://org-mozilla-telemetry-crashes/crash_report.json: An error occurred (404) when calling the HeadObject operation: Not Found\n",
"Fetching crash data schema from https://raw.githubusercontent.com/mozilla/socorro/master/socorro/schemas/crash_report.json\n"
]
}
],
"source": [
"import boto3\n",
"import botocore\n",
"import json\n",
"import tempfile\n",
"import urllib2\n",
"\n",
"region = \"us-west-2\"\n",
"bucket = \"org-mozilla-telemetry-crashes\"\n",
"key = \"crash_report.json\"\n",
"fallback_url = \"https://raw.githubusercontent.com/mozilla/socorro/master/socorro/schemas/crash_report.json\"\n",
"\n",
"try:\n",
" log.info(\"Fetching latest crash data schema from s3://{}/{}\".format(bucket, key))\n",
" s3 = boto3.client('s3', region_name=region)\n",
" # download schema to memory via a file like object\n",
" resp = tempfile.TemporaryFile()\n",
" s3.download_fileobj(bucket, key, resp)\n",
" resp.seek(0)\n",
"except botocore.exceptions.ClientError as e:\n",
" log.warning((\"Could not fetch schema from s3://{}/{}: {}\\n\"\n",
" \"Fetching crash data schema from {}\")\n",
" .format(bucket, key, e, fallback_url))\n",
" resp = urllib2.urlopen(fallback_url)\n",
"\n",
"schema_data = json.load(resp)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Read crash data as json, convert it to parquet"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"crash_schema = create_struct(schema_data)"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from datetime import datetime as dt, timedelta, date\n",
"from pyspark.sql import SQLContext\n",
"\n",
"# default to v0, since the first versioned schema will start with 1\n",
"version = schema_data.get('$target_version', 0)\n",
"\n",
"def daterange(start_date, end_date):\n",
" for n in range(int((end_date - start_date).days) + 1):\n",
" yield (end_date - timedelta(n)).strftime(\"%Y%m%d\")\n",
"\n",
"def import_day(d):\n",
" source_s3path = \"s3://org-mozilla-telemetry-crashes/v1/crash_report\"\n",
" dest_s3path = \"s3://telemetry-test-bucket/socorro_crash/\"\n",
" num_partitions = 10\n",
" log.info(\"Processing {}, started at {}\".format(d, dt.utcnow()))\n",
" cur_source_s3path = \"{}/{}\".format(source_s3path, d)\n",
" cur_dest_s3path = \"{}/v{}/crash_date={}\".format(dest_s3path, version, d)\n",
" df = sqlContext.read.json(cur_source_s3path, schema=crash_schema)\n",
" df.repartition(num_partitions).write.parquet(cur_dest_s3path, mode=\"overwrite\")\n",
"\n",
"def backfill(start_date_yyyymmdd):\n",
" start_date = dt.strptime(start_date_yyyymmdd, \"%Y%m%d\")\n",
" end_date = dt.utcnow() - timedelta(1) # yesterday\n",
" for d in daterange(start_date, end_date):\n",
" try:\n",
" import_day(d)\n",
" except Exception as e:\n",
" log.error(e)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Run this notebook on November 2, 2016"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:__main__:Processing 20161102, started at 2016-11-08 19:07:18.015029\n"
]
}
],
"source": [
"yesterday = dt.strftime(dt.utcnow() - timedelta(1), \"%Y%m%d\")\n",
"import_day(\"20161102\")\n",
"\n",
"#backfill(\"20160902\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Load the crash data back into memory and see if we can query on the json_dump.modules field."
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"dest_s3path = \"s3://telemetry-test-bucket/socorro_crash/v1/crash_date=20161102\"\n",
"df = sqlContext.read.parquet(dest_s3path)"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"DataFrame[modules: array<struct<base_addr:string,code_id:string,debug_file:string,debug_id:string,end_addr:string,filename:string,loaded_symbols:boolean,missing_symbols:boolean,symbol_disk_cache_hit:boolean,symbol_url:string,version:string>>]"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df.select(\"json_dump.modules\")"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"321770"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df.select(\"json_dump.modules\").count()"
]
}
],
"metadata": {
"anaconda-cloud": {},
"kernelspec": {
"display_name": "Python [default]",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.12"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment