Skip to content

Instantly share code, notes, and snippets.

@rmoff
Created July 13, 2016 06:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save rmoff/76b477f69303dd8a9d8ee460a341c445 to your computer and use it in GitHub Desktop.
Save rmoff/76b477f69303dd8a9d8ee460a341c445 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Instantiate the BDD Shell context"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"execfile('ipython/00-bdd-shell-init.py')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Load Pandas python library"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import pandas as pd"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Import CSV\n",
"\n",
"Note that this is going to be done in memory, so don't go running it on _huge_ files. \n",
"\n",
"(Data source: http://www.ncdc.noaa.gov/qclcd/QCLCD?prior=N)"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"df = pd.read_csv('/u02/custom/us_weather/daily/201401daily.txt')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Preview content"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>WBAN</th>\n",
" <th>YearMonthDay</th>\n",
" <th>Tmax</th>\n",
" <th>TmaxFlag</th>\n",
" <th>Tmin</th>\n",
" <th>TminFlag</th>\n",
" <th>Tavg</th>\n",
" <th>TavgFlag</th>\n",
" <th>Depart</th>\n",
" <th>DepartFlag</th>\n",
" <th>...</th>\n",
" <th>AvgSpeed</th>\n",
" <th>AvgSpeedFlag</th>\n",
" <th>Max5Speed</th>\n",
" <th>Max5SpeedFlag</th>\n",
" <th>Max5Dir</th>\n",
" <th>Max5DirFlag</th>\n",
" <th>Max2Speed</th>\n",
" <th>Max2SpeedFlag</th>\n",
" <th>Max2Dir</th>\n",
" <th>Max2DirFlag</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>3013</td>\n",
" <td>20140101</td>\n",
" <td>43</td>\n",
" <td></td>\n",
" <td>20</td>\n",
" <td></td>\n",
" <td>32</td>\n",
" <td></td>\n",
" <td>M</td>\n",
" <td></td>\n",
" <td>...</td>\n",
" <td>8.6</td>\n",
" <td></td>\n",
" <td>36</td>\n",
" <td></td>\n",
" <td>020</td>\n",
" <td></td>\n",
" <td>29</td>\n",
" <td></td>\n",
" <td>020</td>\n",
" <td></td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>3013</td>\n",
" <td>20140102</td>\n",
" <td>47</td>\n",
" <td></td>\n",
" <td>11</td>\n",
" <td></td>\n",
" <td>29</td>\n",
" <td></td>\n",
" <td>M</td>\n",
" <td></td>\n",
" <td>...</td>\n",
" <td>3.8</td>\n",
" <td></td>\n",
" <td>13</td>\n",
" <td></td>\n",
" <td>150</td>\n",
" <td></td>\n",
" <td>12</td>\n",
" <td></td>\n",
" <td>060</td>\n",
" <td></td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>3013</td>\n",
" <td>20140103</td>\n",
" <td>65</td>\n",
" <td></td>\n",
" <td>17</td>\n",
" <td></td>\n",
" <td>41</td>\n",
" <td></td>\n",
" <td>M</td>\n",
" <td></td>\n",
" <td>...</td>\n",
" <td>8.6</td>\n",
" <td></td>\n",
" <td>20</td>\n",
" <td></td>\n",
" <td>260</td>\n",
" <td></td>\n",
" <td>17</td>\n",
" <td></td>\n",
" <td>270</td>\n",
" <td></td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>3013</td>\n",
" <td>20140104</td>\n",
" <td>45</td>\n",
" <td></td>\n",
" <td>12</td>\n",
" <td></td>\n",
" <td>29</td>\n",
" <td></td>\n",
" <td>M</td>\n",
" <td></td>\n",
" <td>...</td>\n",
" <td>11.2</td>\n",
" <td></td>\n",
" <td>26</td>\n",
" <td></td>\n",
" <td>030</td>\n",
" <td></td>\n",
" <td>22</td>\n",
" <td></td>\n",
" <td>020</td>\n",
" <td></td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>3013</td>\n",
" <td>20140105</td>\n",
" <td>15</td>\n",
" <td></td>\n",
" <td>-12</td>\n",
" <td></td>\n",
" <td>2</td>\n",
" <td></td>\n",
" <td>M</td>\n",
" <td></td>\n",
" <td>...</td>\n",
" <td>5.1</td>\n",
" <td></td>\n",
" <td>17</td>\n",
" <td></td>\n",
" <td>050</td>\n",
" <td></td>\n",
" <td>13</td>\n",
" <td></td>\n",
" <td>030</td>\n",
" <td></td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"<p>5 rows × 50 columns</p>\n",
"</div>"
],
"text/plain": [
" WBAN YearMonthDay Tmax TmaxFlag Tmin TminFlag Tavg TavgFlag Depart \\\n",
"0 3013 20140101 43 20 32 M \n",
"1 3013 20140102 47 11 29 M \n",
"2 3013 20140103 65 17 41 M \n",
"3 3013 20140104 45 12 29 M \n",
"4 3013 20140105 15 -12 2 M \n",
"\n",
" DepartFlag ... AvgSpeed AvgSpeedFlag Max5Speed Max5SpeedFlag \\\n",
"0 ... 8.6 36 \n",
"1 ... 3.8 13 \n",
"2 ... 8.6 20 \n",
"3 ... 11.2 26 \n",
"4 ... 5.1 17 \n",
"\n",
" Max5Dir Max5DirFlag Max2Speed Max2SpeedFlag Max2Dir Max2DirFlag \n",
"0 020 29 020 \n",
"1 150 12 060 \n",
"2 260 17 270 \n",
"3 030 22 020 \n",
"4 050 13 030 \n",
"\n",
"[5 rows x 50 columns]"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df.head()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Convert to a Spark dataframe\n",
"\n",
"So that we can then write it to Hive."
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"spark_df = sqlContext.createDataFrame(df)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Write the Spark dataframe as a Hive table"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"tablename = 'us_weather_daily'\n",
"qualified_tablename='default.' + tablename\n",
"spark_df.write.mode('Overwrite').saveAsTable(qualified_tablename)"
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": true
},
"source": [
"# Invoke DP CLI to add the new Hive table to the BDD Catalog\n",
"\n",
"*For this to work, you need :* \n",
"\n",
"* *amend `spark-defaults.conf` to **comment out/delete** the following line:*\n",
"\n",
" spark.serializer=org.apache.spark.serializer.KryoSerializer\n",
" \n",
"* *update `/etc/hadoop/conf/yarn-site.xml` and add a configuration for `yarn.nodemanager.resource.memory-mb` which I set to `16000`. Without this, the `data_processing_CLI` job could be seen in YARN as stuck at ACCEPTED.*"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from subprocess import call\n",
"call([\"/u01/bdd/v1.2.0/BDD-1.2.0.31.813/dataprocessing/edp_cli/data_processing_CLI\",\"--table\",tablename])\n",
"\n",
"# If you don't want to run the import automagically, uncomment the following to get the statement to run\n",
"# for when you do want to run the import\n",
"# print '/u01/bdd/v1.2.0/BDD-1.2.0.31.813/dataprocessing/edp_cli/data_processing_CLI --table %s' % tablename"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.11"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
@rmoff
Copy link
Author

rmoff commented Jul 13, 2016

Solution to data_processing_CLI problem - amend spark-defaults.conf to comment out/delete the following line:

spark.serializer=org.apache.spark.serializer.KryoSerializer

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment