Skip to content

Instantly share code, notes, and snippets.

@sujitpal
Created March 8, 2014 23:57
Show Gist options
  • Save sujitpal/9440887 to your computer and use it in GitHub Desktop.
Save sujitpal/9440887 to your computer and use it in GitHub Desktop.
Presentation to demonstrate using Hadoop Streaming and Python to group/collect and join/exclude operations on Database data.
Display the source blob
Display the rendered blob
Raw
{
"metadata": {
"name": ""
},
"nbformat": 3,
"nbformat_minor": 0,
"worksheets": [
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Hadoop Streaming with Python on Amazon EMR\n",
"\n",
"## Job #1:\n",
"\n",
"Our input is a flat file in tab-separated (TSV) format. It is extracted from a RDBMS using the following SQL command:\n",
"\n",
" select c.CUI, c.STR, s.TUI\n",
" from MRCONSO c, MRSTY s\n",
" where c.LAT = 'ENG'\n",
" and c.CUI = s.CUI\n",
" into outfile '/tmp/cuistr.csv'\n",
" fields terminated by '\\t'\n",
" lines terminated by '\\n';\n",
"\n",
"The resulting file looks like this:\n",
"\n",
" C0070474\tpersulfides\tT104\n",
" C0678518\tcongener\tT104\n",
" C0678518\tcongeners\tT104\n",
" C0684279\tIron Compounds, Unspecified\tT104\n",
" C0684279\tIron Compounds\tT104\n",
" C0684279\tCompounds, Iron\tT104\n",
" C0684279\tIron compound (substance)\tT104\n",
" C0684279\tIron compound\tT104\n",
" C0684279\tIron compound, NOS\tT104\n",
" C0684298\tnitrogen compound\tT104\n",
"\n",
"First column is an ID, the second is a descriptive name or synonym for this item, and the third column is a category in which this item belongs. An item can have multiple synonyms and multiple categories. Our objective is to group the records by the ID, and for each group, collect the synonyms and categories. So the first three lines in the input above will now look like this:\n",
"\n",
" C0070474\t['persulfides']\t['T104']\n",
" C0678518\t['congener', 'congeners']\t['T104']\n",
"\n",
"The synonyms and categories are collected into a list and then serialized to a string representation.\n",
"\n",
"We will do this using [Hadoop Streaming](http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/).\n",
"\n",
"Hadoop Streaming jobs consist of scripts representing a Mapper and a Reducer which communicate with Hadoop via STDIN.\n",
"\n",
"Here is the code for our Mapper:\n"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"#!/usr/bin/env python\n",
"\n",
"import sys\n",
"\n",
"for line in sys.stdin:\n",
" line = line.strip()\n",
" print line\n"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The Mapper reads the input file line by line, removes the trailing newline, and sends it off to the reducer. Since our input is tab separated, the Hadoop framework expects tab separated key-value pairs, the first column from splitting the line on tab is the key we want to group by, so we can avoid having any logic in the Mapper. Such a Mapper is called an Identity or do-nothing Mapper.\n",
"\n",
"The code for our Reducer is shown below:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"#!/usr/bin/env python\n",
"from operator import itemgetter\n",
"import sys\n",
"\n",
"uniq_syns = set()\n",
"uniq_stys = set()\n",
"curr_cui = None\n",
"cui = None\n",
"for line in sys.stdin:\n",
" line = line.strip()\n",
" cui, syn, sty = line.split(\"\\t\")\n",
" # take advantage of sorted nature of input\n",
" if curr_cui == cui:\n",
" uniq_syns.add(syn)\n",
" uniq_stys.add(sty)\n",
" else:\n",
" if curr_cui != None:\n",
" print \"%s\\t%s\\t%s\" % (curr_cui, list(uniq_syns), list(uniq_stys))\n",
" uniq_syns = set([syn])\n",
" uniq_stys = set([sty])\n",
" curr_cui = cui\n",
"# take care of last line if applicable\n",
"if curr_cui == cui:\n",
" print \"%s\\t%s\\t%s\" % (curr_cui, list(uniq_syns), list(uniq_stys))\n"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The Reducer reads the lines emitted by the Mapper. Before these lines reach the Reducer, Hadoop sorts them by the key (in our case the first column in the tab separated line). \n",
"\n",
"Our reducer code is written to take advantage of the fact that the input is sorted by key. It watches for changes in the key to trigger reduction (in our case accumulation of the second and third column into lists.\n",
"\n",
"To test this job locally before running on Amazon EMR, I have created a small file and I know what the expected output is as well. My test file looks like this:\n",
"\n",
" 123456\tSome Disease\tT123\n",
" 123456\tSome Disease\tT123\n",
" 123456\tAnother name for it\tT234\n",
" 234567\tSome other disease\tT234\n",
" 345678\tYet another disease\tT123\n",
"\n",
"I run this on the Unix command line using the sequence:\n",
"\n",
" cat cuistr_test.csv | ./agg_mapper.py | sort | ./agg_reducer.py > cuistr_agg.csv\n",
" \n",
"The output matches my expectation => tests pass!\n",
"\n",
" 123456\t['Another name for it', 'Some Disease']\t['T123', 'T234']\n",
" 234567\t['Some other disease']\t['T234']\n",
" 345678\t['Yet another disease']\t['T123']\n",
"\n",
"To run this on Amazon EMR follow these steps:\n",
"\n",
"1. Create a folder \"python_mr_test\" in your S3 bucket.\n",
"2. Create a sub-folder \"input\" under your folder \"python_mr_test\"\n",
"3. Upload the file \"cuistr_1000.csv\" to the \"input\" subfolder.\n",
"4. Upload agg_mapper.py and agg_reducer.py to the \"python_mr_test\" subfolder.\n",
"\n",
"Your S3 should now look something like this:\n",
"\n",
" YOUR_S3_BUCKET\n",
" \u251c\u2500\u2500 python_mr_test\n",
" \u2502\u00a0\u00a0 \u251c\u2500\u2500 agg_mapper.py\n",
" \u2502\u00a0\u00a0 \u251c\u2500\u2500 agg_reducer.py\n",
" \u2502\u00a0\u00a0 \u2514\u2500\u2500 input\n",
" \u2502\u00a0\u00a0 \u251c\u2500\u2500 cuistr_1000.csv\n",
"\n",
"Open up your Amazon EMR console, then create a Hadoop Streaming job. Everything is the same as running a JAR file except the last step where you customize the job step. For the job step, choose __Streaming Program__, then specify the path to mapper and reducer.\n",
"\n",
"* Path to mapper: s3n://YOUR_S3_BUCKET/python_mr_test/agg_mapper.py\n",
"* Path to reducer: s3n://YOUR_S3_BUCKET/python_mr_test/agg_reducer.py\n",
"* Input path: s3n://YOUR_S3_BUCKET/python_mr_test/inputs\n",
"* Output path: s3n://YOUR_S3_BUCKET/python_mr_test/outputs\n",
"\n",
"The output of this job can be found in the outputs folder."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Job #2\n",
"\n",
"As before, our input is a tab separated file generated from the following SQL command:\n",
"\n",
" select CUI1, CUI2, RELA from MRREL\n",
" where CUI1 != CUI2\n",
" into outfile \"/tmp/cuirel.csv\"\n",
" fields terminated by \"\\t\"\n",
" lines terminated by \"\\n\";\n",
"\n",
"The resulting file looks like this:\n",
"\n",
" C0005790\tC0005778\tmeasured_by\n",
" C1255279\tC3537249\tmeasured_by\n",
" C1255446\tC0002520\tmeasured_by\n",
" C1255552\tC0596019\tmeasured_by\n",
" C1254417\tC0004611\tmeasured_by\n",
" C1254418\tC0004611\tmeasured_by\n",
" C1254394\tC0004611\tmeasured_by\n",
" C1254416\tC0004611\tmeasured_by\n",
" C1254377\tC0004611\tmeasured_by\n",
" C1254387\tC0004611\tmeasured_by\n",
"\n",
"First column is an ID, second column is another ID and the third column is the relationship between the two IDs. Our objective is to remove the rows for which an ID does not occur in the previous aggregated (our reference) file. This involves joining the file twice with the CUI field in the reference file, first time against the first column, second time against the second column. At each step, we will retain the row only if the key is available in the reference file. In the first step, rows which don't have their first ID will be eliminated, and in the second step, rows which don't have their second ID in the reference file will be eliminated.\n",
"\n",
"Here is the code for the Mapper:\n"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"#!/usr/bin/env python\n",
"\n",
"import sys\n",
"\n",
"cui_index = 0\n",
"for line in sys.stdin:\n",
" line = line.strip()\n",
" cols = line.split(\"\\t\")\n",
" if cols[1].startswith(\"[\"):\n",
" # output of agg_* job\n",
" print \"%s\\tXXX\" % (cols[0])\n",
" else:\n",
" # from cuirels (either first or second iteration)\n",
" cols = line.split(\"\\t\")\n",
" print \"%s\\t%s\" % (cols[cui_index], line)\n"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"For the first run, the cui_index is set to 0. For the second run we will set it to 1. The mapper's input contains the output from the previous job (the aggregated file) and the cuirels.csv file we just described. The mapper looks at the input to decide which file is being read (the aggregated file contains lists in the second and third columns, hence the startswith check. If it is data from the aggregated file, it extracts the key and sets \"XXX\" to the value and emits it. If it is data from the relations file, it extracts the key and sets the original line itself as the value and emits it.\n",
"\n",
"This data is sorted by Hadoop and sent to the Reducer, whose code is shown below:\n"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"#!/usr/bin/env python\n",
"\n",
"from operator import itemgetter\n",
"import sys\n",
"\n",
"vals = []\n",
"curr_cui = None\n",
"cui = None\n",
"include = False\n",
"for line in sys.stdin:\n",
" line = line.strip()\n",
" (cui, payload) = line.split(\"\\t\", 1)\n",
" # input is sorted, so we watch for change in cui to reduce\n",
" if curr_cui == cui:\n",
" if payload == \"XXX\":\n",
" include = True\n",
" else:\n",
" vals.append(payload)\n",
" else:\n",
" if curr_cui != None:\n",
" if include:\n",
" for val in vals:\n",
" print val\n",
" if payload == \"XXX\":\n",
" include = True\n",
" vals = []\n",
" else:\n",
" include = False\n",
" vals = [payload]\n",
" curr_cui = cui\n",
"# take care of last line\n",
"if curr_cui == cui:\n",
" if include:\n",
" for val in vals:\n",
" print val\n",
" "
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Just like the previous job, we take advantage of the fact that the input is sorted by key, so we watch for changes in the key in order to do our grouping. In addition to grouping relationship records (ie those for which the value is not \"XXX\"), we also check to see if we found a value \"XXX\" in our set. Finding this indicates that the key is available in the reference file, so we can include these records in the output, otherwise we discard them (this is what the logic \"if include:...\" does above.\n",
"\n",
"As before, we want to check this locally, so we run it through our tiny test files. The test file for this job looks like this:\n",
"\n",
" 123456\t345678\trel1\n",
" 234567\t123456\trel2\n",
" 234567\t456789\trel3\n",
" 456789\t567890\trel4\n",
" 456789\t123456\trel5\n",
"\n",
"On running our code using this test file like so:\n",
"\n",
" cat cuistr_agg.csv cuirel_test.csv | ./filt_mapper0.py | sort | ./filt_reducer.py > cuirel_left.csv\n",
"\n",
"Which returns the data shown below. As you can see, we have gotten rid of the 456789 ID on the 4th and 5th row of our input data.\n",
"\n",
" 123456\t345678\trel1\n",
" 234567\t123456\trel2\n",
" 234567\t456789\trel3\n",
"\n",
"Now we clone filt_mapper0.py to filt_mapper1.py by changing cui_index from 0 to 1, so it now points to the second column (array indexes in Python are 0-based). We now use the cuirel_left.csv as our input file to produce the final output file.\n",
"\n",
" cat cuistr_agg.csv cuirel_left.csv | ./filt_mapper1.py | sort | ./filt_reducer.py > cuirel_right.csv\n",
"\n",
"produces the output below. Notice that the third row in the previous output had the \"bad\" ID 456789 in the second column, so our second run got rid of that one.\n",
"\n",
" 234567\t123456\trel2\n",
" 123456\t345678\trel1\n",
"\n",
"We are now ready to submit this job to Amazon EMR. Follow these steps to submit the data and code.\n",
"\n",
"1. Upload filt_mapper0.py, filt_mapper1.py and filt_reducer.py to the python_mr_test folder on S3.\n",
"2. Upload the file cuirel_5000.csv to the python_mr_test/outputs folder.\n",
"\n",
"Your S3 should now look something like this:\n",
"\n",
" YOUR_S3_BUCKET\n",
" \u251c\u2500\u2500 python_mr_test\n",
" \u2502\u00a0\u00a0 \u251c\u2500\u2500 agg_mapper.py\n",
" \u2502\u00a0\u00a0 \u251c\u2500\u2500 agg_reducer.py\n",
" \u2502\u00a0\u00a0 \u251c\u2500\u2500 filt_mapper0.py\n",
" \u2502\u00a0\u00a0 \u251c\u2500\u2500 filt_mapper1.py\n",
" \u2502\u00a0\u00a0 \u251c\u2500\u2500 filt_reducer.py\n",
" \u2502\u00a0\u00a0 \u251c\u2500\u2500 inputs\n",
" \u2502\u00a0\u00a0 \u2502 \u2514\u2500\u2500 cuistr.csv\n",
" \u2502\u00a0\u00a0 \u251c\u2500\u2500 outputs\n",
" \u2502\u00a0\u00a0 \u251c\u2500\u2500 part-r-00000\n",
" \u2502\u00a0\u00a0 \u251c\u2500\u2500 part-r-00001\n",
" \u2502\u00a0\u00a0 \u251c\u2500\u2500 ...\n",
" \u2502\u00a0\u00a0 \u2514\u2500\u2500 cuirel.csv\n",
"\n",
"\n",
"Create a second Hadoop Streaming job. Set the following properties:\n",
"\n",
"* Path to mapper: s3n://YOUR_S3_BUCKET/python_mr_test/filt_mapper0.py\n",
"* Path to reducer: s3n://YOUR_S3_BUCKET/python_mr_test/filt_reducer.py\n",
"* Input path: s3n://YOUR_S3_BUCKET/python_mr_test/outputs\n",
"* Output path: s3n://YOUR_S3_BUCKET/python_mr_test/outputs2\n",
"\n",
"This step removes the bad IDs from the first column of cuirel.csv, and writes out the partially clean file into the outputs2 subdirectory. We now run an almost identical second job to remove the bad IDs from the second column of the file, the only change is the mapper, and the input and output directories. \n",
"\n",
"We will now use the outputs2 directory as the input directory, so we have to download the part-r files from the outputs directory, rename them and upload it into outputs2. At this point, your S3 would look something like this:\n",
"\n",
" YOUR_S3_BUCKET\n",
" \u251c\u2500\u2500 python_mr_test\n",
" \u2502\u00a0\u00a0 \u251c\u2500\u2500 agg_mapper.py\n",
" \u2502\u00a0\u00a0 \u251c\u2500\u2500 agg_reducer.py\n",
" \u2502\u00a0\u00a0 \u251c\u2500\u2500 filt_mapper0.py\n",
" \u2502\u00a0\u00a0 \u251c\u2500\u2500 filt_mapper1.py\n",
" \u2502\u00a0\u00a0 \u251c\u2500\u2500 filt_reducer.py\n",
" \u2502\u00a0\u00a0 \u251c\u2500\u2500 inputs\n",
" \u2502\u00a0\u00a0 \u2502 \u2514\u2500\u2500 cuistr.csv\n",
" \u2502\u00a0\u00a0 \u251c\u2500\u2500 outputs\n",
" \u2502\u00a0\u00a0 \u2502 \u251c\u2500\u2500 part-r-00000\n",
" \u2502\u00a0\u00a0 \u2502 \u251c\u2500\u2500 part-r-00001\n",
" \u2502\u00a0\u00a0 \u2502 \u251c\u2500\u2500 ...\n",
" \u2502\u00a0\u00a0 \u2502 \u2514\u2500\u2500 cuirel.csv\n",
" \u2502\u00a0\u00a0 \u251c\u2500\u2500 outputs2\n",
" \u2502\u00a0\u00a0 \u2502 \u251c\u2500\u2500 part-r-00000\n",
" \u2502\u00a0\u00a0 \u2502 \u251c\u2500\u2500 part-r-00001\n",
" \u2502\u00a0\u00a0 \u2502 \u251c\u2500\u2500 ...\n",
" \u2502\u00a0\u00a0 \u2502 \u251c\u2500\u2500 cuistr-agg-00001\n",
" \u2502\u00a0\u00a0 \u2502 \u251c\u2500\u2500 cuistr-agg-00002\n",
" \u2502\u00a0\u00a0 \u2502 \u251c\u2500\u2500 ...\n",
"\n",
"We now create our final Hadoop Streaming job with the following properties:\n",
"\n",
"* Path to mapper: s3n://YOUR_S3_BUCKET/python_mr_test/filt_mapper1.py\n",
"* Path to reducer: s3n://YOUR_S3_BUCKET/python_mr_test/filt_reducer.py\n",
"* Input path: s3n://YOUR_S3_BUCKET/python_mr_test/outputs2\n",
"* Output path: s3n://YOUR_S3_BUCKET/python_mr_test/outputs3\n",
"\n",
"Outputs for the bad ID removal jobs can be found in the outputs3 folder."
]
}
],
"metadata": {}
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment