Skip to content

Instantly share code, notes, and snippets.

@mrm1001
Created June 25, 2015 14:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mrm1001/94f2a09ef090f6132f62 to your computer and use it in GitHub Desktop.
Save mrm1001/94f2a09ef090f6132f62 to your computer and use it in GitHub Desktop.
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<h1>1. Creating an RDD</h1>"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We start by creating the 3 RDDs from the different datasets from Amazon product reviews. The data has been kindly provided to us by Dr Julian McAuley and here is a reference of the original papers:\n",
"<ul>\n",
"<li>Image-based recommendations on styles and substitutes , J. McAuley, C. Targett, J. Shi, A. van den Hengel, SIGIR, 2015</li>\n",
"<li>Inferring networks of substitutable and complementary products, J. McAuley, R. Pandey, J. Leskovec, Knowledge Discovery and Data Mining, 2015</li>\n",
"</ul>\n",
"\n",
"Note that it does not move the data at this stage due to the lazy evaluation nature."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"fashion = sc.textFile('Data/Reviews/fashion.json')\n",
"electronics = sc.textFile('Data/Reviews/electronics.json')\n",
"sports = sc.textFile('Data/Reviews/sports.json')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Note that it does not move the data at this stage due to the lazy evaluation nature. We do some data exploration."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"fashion has 10000 rows, electronics 10000 rows and sports 10000 rows\n",
"fashion first row:\n"
]
},
{
"data": {
"text/plain": [
"u'{\"reviewerID\": \"A2XVJBSRI3SWDI\", \"asin\": \"0000031887\", \"reviewerName\": \"abigail\", \"helpful\": [0, 0], \"reviewText\": \"Perfect red tutu for the price. I baught it as part of my daughters Halloween costume and it looked great on her.\", \"overall\": 5.0, \"summary\": \"Nice tutu\", \"unixReviewTime\": 1383523200, \"reviewTime\": \"11 4, 2013\"}'"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"print \"fashion has {0} rows, electronics {1} rows and sports {2} rows\".format(fashion.count(), electronics.count(), sports.count())\n",
"print \"fashion first row:\"\n",
"fashion.first()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can union them."
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{\"reviewerID\": \"A2XVJBSRI3SWDI\", \"asin\": \"0000031887\", \"reviewerName\": \"abigail\", \"helpful\": [0, 0], \"reviewText\": \"Perfect red tutu for the price. I baught it as part of my daughters Halloween costume and it looked great on her.\", \"overall\": 5.0, \"summary\": \"Nice tutu\", \"unixReviewTime\": 1383523200, \"reviewTime\": \"11 4, 2013\"}\n"
]
}
],
"source": [
"union_of_rdds = fashion.union(electronics).union(sports)\n",
"print union_of_rdds.first()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can now parse the file using the json library."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"{u'asin': u'0000031887',\n",
" u'helpful': [0, 0],\n",
" u'overall': 5.0,\n",
" u'reviewText': u'Perfect red tutu for the price. I baught it as part of my daughters Halloween costume and it looked great on her.',\n",
" u'reviewTime': u'11 4, 2013',\n",
" u'reviewerID': u'A2XVJBSRI3SWDI',\n",
" u'reviewerName': u'abigail',\n",
" u'summary': u'Nice tutu',\n",
" u'unixReviewTime': 1383523200}"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import json\n",
"parsed_fashion = fashion.map(lambda x: json.loads(x))\n",
"parsed_fashion.first()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Another way of loading files is by using a list of comma-separated file paths or a wildcard."
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"3"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"data = sc.textFile('Data/Reviews/fashion.json,Data/Reviews/electronics.json,Data/Reviews/sports.json').map(lambda x: json.loads(x))\n",
"\n",
"# QUESTION: How many partitions does the rdd have?\n",
"data.getNumPartitions()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now let's imagine we want to know the number of lines in each partition. For that, we need to access the data in each single partition and run operations on them instead of on each row.\n",
"\n",
"For this, we will use mapPartitionsWithIndex which takes a partition index and an iterator over the data as arguments. Each function in the API is documented in: https://spark.apache.org/docs/1.3.1/api/python/pyspark.html#pyspark.RDD"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"[(0, 10000), (1, 10000), (2, 10000)]"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"indexed_data = data.mapPartitionsWithIndex(lambda splitIndex, it: [(splitIndex, len([x for x in it]))])\n",
"indexed_data.collect()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<h1>2. Reducers</h1>"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The next thing we have been tasked to do is to get the total number of reviews per product."
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Max number of reviews is 2033, min number of reviews is 1\n"
]
}
],
"source": [
"product_num = data.map(lambda x: (x['asin'], 1)).reduceByKey(lambda x,y: x+y)\n",
"# The rdd product_num will contain (product_asin, total_number_reviews)\n",
"\n",
"# What are the maximum and minimum number of reviews?\n",
"max_num = product_num.map(lambda x: x[1]).max()\n",
"min_num = product_num.map(lambda x: x[1]).min()\n",
"\n",
"print \"Max number of reviews is {0}, min number of reviews is {1}\".format(max_num, min_num)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"![Alt text](Images/reducebykey.png)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**EXERCISE**: what is the max score for each product?"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<h1>3. Joining multiple sources</h1>"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We want to join the product reviews by users to the product metadata."
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{u'asin': u'0000037214', u'title': u'Purple Sequin Tiny Dancer Tutu Ballet Dance Fairy Princess Costume Accessory', u'price': 6.99, u'imUrl': u'http://ecx.images-amazon.com/images/I/31mCncNuAZL.jpg', u'related': {u'also_viewed': [u'B00JO8II76', u'B00DGN4R1Q', u'B00E1YRI4C']}, u'salesRank': {u'Clothing': 1233557}, u'brand': u'Big Dreams', u'categories': [[u'Clothing, Shoes & Jewelry', u'Girls'], [u'Clothing, Shoes & Jewelry', u'Novelty, Costumes & More', u'Costumes & Accessories', u'More Accessories', u'Kids & Baby']]}\n"
]
}
],
"source": [
"product_metadata = sc.textFile('Data/Products/sample_metadata.json').map(lambda x: json.loads(x))\n",
"print product_metadata.first()"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"def flatten_categories(line):\n",
" old_cats = line['categories']\n",
" line['categories'] = [item for sublist in old_cats for item in sublist]\n",
" return line\n",
"\n",
"product_metadata = product_metadata.map(lambda x: flatten_categories(x))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We want to join the review data to the metadata about the product. We can use the 'asin' for that, which is a unique identifier for each product. In order to do a join, we need to turn each structure into key-value pairs."
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"We are joining 30000 product reviews to 2469 rows of metadata information about the products\n",
"First row of key_val_data:\n",
"(u'0000031887', {u'reviewerID': u'A2XVJBSRI3SWDI', u'asin': u'0000031887', u'reviewerName': u'abigail', u'helpful': [0, 0], u'reviewText': u'Perfect red tutu for the price. I baught it as part of my daughters Halloween costume and it looked great on her.', u'overall': 5.0, u'summary': u'Nice tutu', u'unixReviewTime': 1383523200, u'reviewTime': u'11 4, 2013'})\n"
]
}
],
"source": [
"key_val_data = data.map(lambda x: (x['asin'], x))\n",
"key_val_metadata = product_metadata.map(lambda x: (x['asin'], x))\n",
"\n",
"print \"We are joining {0} product reviews to {1} rows of metadata information about the products\".format(key_val_data.count(),key_val_metadata.count())\n",
"print \"First row of key_val_data:\"\n",
"print key_val_data.first()"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"number partitions key_val_data: 3\n",
"number partitions key_val_metadata: 2\n"
]
},
{
"data": {
"text/plain": [
"(u'8179050874',\n",
" ({u'asin': u'8179050874',\n",
" u'helpful': [0, 0],\n",
" u'overall': 1.0,\n",
" u'reviewText': u\"I bought this item because of the description that is for the Blackberry bold, to my surprise is for the curve it doesn't fit the screen there is like one inch of screen not protected by the screen, also it reflects sunlight making the screen virtually unusable when outdoors, and looks ugly..\",\n",
" u'reviewTime': u'05 17, 2009',\n",
" u'reviewerID': u'A1IQJSHCMW69O5',\n",
" u'reviewerName': u'Jose Perez',\n",
" u'summary': u'This is not for Bold is for Curve',\n",
" u'unixReviewTime': 1242518400},\n",
" {u'asin': u'8179050874',\n",
" u'categories': [u'Electronics',\n",
" u'Computers & Accessories',\n",
" u'Laptop & Netbook Computer Accessories',\n",
" u'Batteries'],\n",
" u'imUrl': u'http://ecx.images-amazon.com/images/I/41f2QHnWYNL._SY300_.jpg',\n",
" u'salesRank': {u'Electronics': 324466},\n",
" u'title': u'PRIVACY Screen Saver for your BLACKBERRY Bold 9000 ! Shield and Prevent others from viewing your information while protecting your phone!'}))"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"print \"number partitions key_val_data: \", \n",
"print key_val_data.getNumPartitions()\n",
"print \"number partitions key_val_metadata: \", \n",
"print key_val_metadata.getNumPartitions()\n",
"\n",
"joined = key_val_data.join(key_val_metadata)\n",
"joined.first()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"What is the number of output partitions of the join? To understand this, the best is to refer back to the Pyspark source code: https://github.com/apache/spark/blob/branch-1.3/python/pyspark/join.py"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"There are 5 partitions\n"
]
},
{
"data": {
"text/plain": [
"[(u'8179050874',\n",
" ({u'asin': u'8179050874',\n",
" u'helpful': [0, 0],\n",
" u'overall': 1.0,\n",
" u'reviewText': u\"I bought this item because of the description that is for the Blackberry bold, to my surprise is for the curve it doesn't fit the screen there is like one inch of screen not protected by the screen, also it reflects sunlight making the screen virtually unusable when outdoors, and looks ugly..\",\n",
" u'reviewTime': u'05 17, 2009',\n",
" u'reviewerID': u'A1IQJSHCMW69O5',\n",
" u'reviewerName': u'Jose Perez',\n",
" u'summary': u'This is not for Bold is for Curve',\n",
" u'unixReviewTime': 1242518400},\n",
" {u'asin': u'8179050874',\n",
" u'categories': [u'Electronics',\n",
" u'Computers & Accessories',\n",
" u'Laptop & Netbook Computer Accessories',\n",
" u'Batteries'],\n",
" u'imUrl': u'http://ecx.images-amazon.com/images/I/41f2QHnWYNL._SY300_.jpg',\n",
" u'salesRank': {u'Electronics': 324466},\n",
" u'title': u'PRIVACY Screen Saver for your BLACKBERRY Bold 9000 ! Shield and Prevent others from viewing your information while protecting your phone!'})),\n",
" (u'8179050874',\n",
" ({u'asin': u'8179050874',\n",
" u'helpful': [0, 0],\n",
" u'overall': 1.0,\n",
" u'reviewText': u'Despite being sold specifically for the Blackberry Bold 9000, it simply doesn\\'t fit a Blackberry Bold.The screen protector is about a third of a millimetre too wide. As a result, the chrome trim around the outside of the Blackberry prevents it from lying flat on the edges of the screen so it does not attach to the screen properly: there is always a 2-3 millimetres of \"air margin\" down either one or both sides.The problems are therefore:1. It looks ugly2. It will fill with dust3. Case-mate support have been messing me around for over a month now and I\\'m beginning to suspect they are just hoping that I\\'ll go away and stop annoying them. In other words, the tech support is as useless as the product...',\n",
" u'reviewTime': u'03 26, 2009',\n",
" u'reviewerID': u'A2HC8YQVZ4HMF5',\n",
" u'reviewerName': u'Wowbagger the Infinitely Prolonged',\n",
" u'summary': u\"Doesn't even fit the screen...\",\n",
" u'unixReviewTime': 1238025600},\n",
" {u'asin': u'8179050874',\n",
" u'categories': [u'Electronics',\n",
" u'Computers & Accessories',\n",
" u'Laptop & Netbook Computer Accessories',\n",
" u'Batteries'],\n",
" u'imUrl': u'http://ecx.images-amazon.com/images/I/41f2QHnWYNL._SY300_.jpg',\n",
" u'salesRank': {u'Electronics': 324466},\n",
" u'title': u'PRIVACY Screen Saver for your BLACKBERRY Bold 9000 ! Shield and Prevent others from viewing your information while protecting your phone!'}))]"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# QUESTION: what is the number of partitions of the joined dataset?\n",
"\n",
"print \"There are {0} partitions\".format(joined.getNumPartitions())\n",
"\n",
"joined.take(2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"To make it easier to manipulate, we will change the structure of the joined rdd to be a single dictionary."
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"[{u'asin': u'8179050874',\n",
" u'categories': [u'Electronics',\n",
" u'Computers & Accessories',\n",
" u'Laptop & Netbook Computer Accessories',\n",
" u'Batteries'],\n",
" u'helpful': [0, 0],\n",
" u'imUrl': u'http://ecx.images-amazon.com/images/I/41f2QHnWYNL._SY300_.jpg',\n",
" u'overall': 1.0,\n",
" u'reviewText': u\"I bought this item because of the description that is for the Blackberry bold, to my surprise is for the curve it doesn't fit the screen there is like one inch of screen not protected by the screen, also it reflects sunlight making the screen virtually unusable when outdoors, and looks ugly..\",\n",
" u'reviewTime': u'05 17, 2009',\n",
" u'reviewerID': u'A1IQJSHCMW69O5',\n",
" u'reviewerName': u'Jose Perez',\n",
" u'salesRank': {u'Electronics': 324466},\n",
" u'summary': u'This is not for Bold is for Curve',\n",
" u'title': u'PRIVACY Screen Saver for your BLACKBERRY Bold 9000 ! Shield and Prevent others from viewing your information while protecting your phone!',\n",
" u'unixReviewTime': 1242518400},\n",
" {u'asin': u'8179050874',\n",
" u'categories': [u'Electronics',\n",
" u'Computers & Accessories',\n",
" u'Laptop & Netbook Computer Accessories',\n",
" u'Batteries'],\n",
" u'helpful': [0, 0],\n",
" u'imUrl': u'http://ecx.images-amazon.com/images/I/41f2QHnWYNL._SY300_.jpg',\n",
" u'overall': 1.0,\n",
" u'reviewText': u'Despite being sold specifically for the Blackberry Bold 9000, it simply doesn\\'t fit a Blackberry Bold.The screen protector is about a third of a millimetre too wide. As a result, the chrome trim around the outside of the Blackberry prevents it from lying flat on the edges of the screen so it does not attach to the screen properly: there is always a 2-3 millimetres of \"air margin\" down either one or both sides.The problems are therefore:1. It looks ugly2. It will fill with dust3. Case-mate support have been messing me around for over a month now and I\\'m beginning to suspect they are just hoping that I\\'ll go away and stop annoying them. In other words, the tech support is as useless as the product...',\n",
" u'reviewTime': u'03 26, 2009',\n",
" u'reviewerID': u'A2HC8YQVZ4HMF5',\n",
" u'reviewerName': u'Wowbagger the Infinitely Prolonged',\n",
" u'salesRank': {u'Electronics': 324466},\n",
" u'summary': u\"Doesn't even fit the screen...\",\n",
" u'title': u'PRIVACY Screen Saver for your BLACKBERRY Bold 9000 ! Shield and Prevent others from viewing your information while protecting your phone!',\n",
" u'unixReviewTime': 1238025600}]"
]
},
"execution_count": 13,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"def merge_dictionaries(metadata_line, review_line):\n",
" new_dict = review_line\n",
" new_dict.update(metadata_line)\n",
" return new_dict\n",
"\n",
"nice_joined = joined.map(lambda x: merge_dictionaries(x[1][0], x[1][1]))\n",
"nice_joined.take(2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"A couple of questions to probe your understanding of Spark"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# Testing Spark understanding\n",
"\n",
"# QUESTION: if I run this, what will it print?\n",
"def change_title(line):\n",
" line['title'] = 'this is the title'\n",
" return line\n",
"\n",
"categories = nice_joined.map(lambda x: change_title(x))"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"this is the title\n"
]
}
],
"source": [
"# ANSWER:\n",
"print categories.map(lambda x: x['title']).first()"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"u'PRIVACY Screen Saver for your BLACKBERRY Bold 9000 ! Shield and Prevent others from viewing your information while protecting your phone!'"
]
},
"execution_count": 16,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# QUESTION: if I run this, what will be the title of the first row?\n",
"nice_joined.map(lambda x: x['title']).first()"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"BEFORE\n",
"the categories in the first 2 fields are: \n"
]
},
{
"data": {
"text/plain": [
"[[u'Electronics',\n",
" u'Computers & Accessories',\n",
" u'Laptop & Netbook Computer Accessories',\n",
" u'Batteries'],\n",
" [u'Electronics',\n",
" u'Computers & Accessories',\n",
" u'Laptop & Netbook Computer Accessories',\n",
" u'Batteries']]"
]
},
"execution_count": 17,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# QUESTION: if I run this, what will it print?\n",
"def get_first_category(line):\n",
" line['categories'] = line['categories'][0]\n",
" return line\n",
"\n",
"print \"BEFORE\"\n",
"print \"the categories in the first 2 fields are: \"\n",
"nice_joined.map(lambda x: x['categories']).take(2)"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"A x['title']).first()FTER\n"
]
},
{
"data": {
"text/plain": [
"[u'Electronics', u'E']"
]
},
"execution_count": 18,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# QUESTION: if I run this, what will it print?\n",
"print \"A x['title']).first()FTER\"\n",
"nice_joined.map(lambda x: get_first_category(x)).map(lambda x: x['categories']).take(2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"What if we cache nice_joined first?"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"AFTER CACHING\n"
]
},
{
"data": {
"text/plain": [
"[u'Electronics', u'Electronics']"
]
},
"execution_count": 19,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"nice_joined.cache()\n",
"nice_joined.count()\n",
"\n",
"print \"AFTER CACHING\"\n",
"nice_joined.map(lambda x: get_first_category(x)).map(lambda x: x['categories']).take(2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<h1>4. GroupByKey</h1>"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now that we have joined two data sources, we can start doing some ad-hoc analysis of the data! Let's start by counting the number of reviews per category. The categories are encoded as a list of categories, so we need to count 1 for each 'sub-category'."
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"30000"
]
},
"execution_count": 20,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"nice_joined.first()\n",
"nice_joined.cache()\n",
"nice_joined.count()"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"all_categories.take(5): [u'Electronics', u'Computers & Accessories', u'Laptop & Netbook Computer Accessories', u'Batteries', u'Electronics']\n",
"\n",
"There are 925 categories.\n"
]
}
],
"source": [
"# We want to get the distinct number of categories\n",
"all_categories = nice_joined.flatMap(lambda x: x['categories'])\n",
"print \"all_categories.take(5): \",\n",
"print all_categories.take(5)\n",
"num_categories = all_categories.distinct().count()\n",
"print\n",
"\n",
"print \"There are {0} categories.\".format(num_categories)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We are going to take the categories in each review and count them as being reviewed once."
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[(u'Screen Protectors', 10), (u'Jazz', 166), (u'Stands', 6), (u'Touch Screen Tablet Accessories', 320), (u'Bike Baskets', 1), (u'Thongs', 1), (u'Rink Equipment', 199), (u'Gun Safes & Cabinets', 3), (u'Soft Boxes', 2), (u'Video Games & Accessories', 17)]\n"
]
}
],
"source": [
"category_count = nice_joined.flatMap(lambda x: [(y,1) for y in x['categories']])\n",
"category_total_count = category_count.reduceByKey(lambda x,y: x+y)\n",
"print category_total_count.take(10)"
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"The top 5 categories are:\n",
"[(u'Clothing, Shoes & Jewelry', 23983), (u'Sports & Outdoors', 10443), (u'Electronics', 10367), (u'Novelty, Costumes & More', 6384), (u'Men', 4880)]\n"
]
}
],
"source": [
"sorted_categories = sorted(category_total_count.collect(), key=lambda x: x[1], reverse=True)\n",
"print \"The top 5 categories are:\"\n",
"print sorted_categories[:5]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Next, we have been tasked to get the average product review length for each category. We can solve this using groupByKey!"
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"After the flatMap: (u'Electronics', 293)\n",
"After the groupByKey: (u'Screen Protectors', [191, 135, 135, 782, 782, 782, 446, 446, 446, 85])\n",
"\n",
"grouped_category_review.first(): (u'Screen Protectors', 423.0)\n",
"The top 10 categories are: [(u'Photos', 7570.0), (u'Bags, Packs & Accessories', 6411.0), (u'Rifles', 5079.888888888889), (u'Motets', 3404.0), (u'Free-Weight Racks', 3404.0), (u'Weight Racks', 3404.0), (u'Magnificats', 3404.0), (u'Sonatinas', 3239.2), (u'Sonatas', 3239.2), (u'Rugby', 3156.0)]\n"
]
}
],
"source": [
"category_review = nice_joined.flatMap(lambda x: [(y, len(x['reviewText'])) for y in x['categories']])\n",
"print \"After the flatMap: \" + str(category_review.first())\n",
"print \"After the groupByKey: \" + str(category_review.groupByKey().map(lambda x: (x[0], list(x[1]))).first())\n",
"print\n",
"\n",
"grouped_category_review = category_review.groupByKey().map(lambda x: (x[0], sum(x[1])/float(len(x[1]))))\n",
"print \"grouped_category_review.first(): \" + str(grouped_category_review.first())\n",
"\n",
"### Now we can sort the categories by average product review length\n",
"print \"The top 10 categories are: \" + str(sorted(grouped_category_review.collect(), key=lambda x: x[1], reverse=True)[:10])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**EXERCISE**: Do the same thing, but this time you are not allowed to use groupByKey()!"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<h1>Optional: Data skewness</h1>"
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"***Creating the large rdd***\n",
"first 5 items:[(0, 0), (1, 0), (1, 1), (2, 0), (2, 1)]\n",
"num rows: 5171502\n",
"num partitions: 16\n",
"The distribution of elements per partition is [(0, 1), (1, 2), (2, 7), (3, 20), (4, 54), (5, 148), (6, 403), (7, 1096), (8, 2980), (9, 8103), (10, 22026), (11, 59874), (12, 162754), (13, 442413), (14, 1202604), (15, 3269017)]\n",
"\n",
"***Creating the small rdd***\n",
"first 5 items:[(0, 0), (1, 1), (2, 2), (3, 3), (4, 4)]\n",
"num rows: 16\n",
"num partitions: 16\n",
"The distribution of elements per partition is [(0, 1), (1, 1), (2, 1), (3, 1), (4, 1), (5, 1), (6, 1), (7, 1), (8, 1), (9, 1), (10, 1), (11, 1), (12, 1), (13, 1), (14, 1), (15, 1)]\n",
"\n",
"Joining them\n",
"The direct join takes 0:00:30.362042\n",
"The joined rdd has 32 partitions and 5171502 rows\n"
]
}
],
"source": [
"from math import exp\n",
"from datetime import datetime\n",
"\n",
"def get_part_index(splitIndex, iterator):\n",
" for it in iterator:\n",
" yield (splitIndex, it)\n",
" \n",
"def count_elements(splitIndex, iterator):\n",
" n = sum(1 for _ in iterator)\n",
" yield (splitIndex, n)\n",
" \n",
"print \"***Creating the large rdd***\"\n",
"num_parts = 16\n",
"# create the large skewed rdd\n",
"skewed_large_rdd = sc.parallelize(range(0,num_parts), num_parts).flatMap(lambda x: range(0, int(exp(x)))).mapPartitionsWithIndex(lambda ind, x: get_part_index(ind, x)).cache()\n",
"print \"first 5 items:\" + str(skewed_large_rdd.take(5))\n",
"print \"num rows: \" + str(skewed_large_rdd.count())\n",
"print \"num partitions: \" + str(skewed_large_rdd.getNumPartitions())\n",
"print \"The distribution of elements per partition is \" + str(skewed_large_rdd.mapPartitionsWithIndex(lambda ind, x: count_elements(ind, x)).collect())\n",
"print\n",
"\n",
"print \"***Creating the small rdd***\"\n",
"small_rdd = sc.parallelize(range(0,num_parts), num_parts).map(lambda x: (x, x))\n",
"print \"first 5 items:\" + str(small_rdd.take(5))\n",
"print \"num rows: \" + str(small_rdd.count())\n",
"print \"num partitions: \" + str(small_rdd.getNumPartitions())\n",
"print \"The distribution of elements per partition is \" + str(small_rdd.mapPartitionsWithIndex(lambda ind, x: count_elements(ind, x)).collect())\n",
"\n",
"print\n",
"\n",
"print \"Joining them\"\n",
"t0 = datetime.now()\n",
"result = skewed_large_rdd.leftOuterJoin(small_rdd)\n",
"result.count() \n",
"print \"The direct join takes %s\"%(str(datetime.now() - t0))\n",
"print \"The joined rdd has {0} partitions and {1} rows\".format(result.getNumPartitions(), result.count())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<h1>Optional: Integrating Spark with popular Python libraries</h1>"
]
},
{
"cell_type": "code",
"execution_count": 26,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"('Perfect red tutu for the price. I baught it as part of my daughters Halloween costume and it looked great on her.',\n",
" 'fashion')"
]
},
"execution_count": 26,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import sklearn\n",
"import pickle\n",
"\n",
"model = pickle.load(open('Data/classifiers/classifier.pkl', 'r'))\n",
"model_b = sc.broadcast(model)\n",
"fashion.map(lambda x: eval(x)['reviewText']).map(lambda x: (x, model_b.value.predict([x])[0])).first()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<h1>Spark.. A View from the trenches: Part 2</h1>"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<h1>Introduction</h1>\n",
"\n",
"This is the latter part of the tutorial. The main focus will be on Spark DataFrames and Spark SQL."
]
},
{
"cell_type": "code",
"execution_count": 27,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"number of reviews : 30000\n",
"sample row : \n",
"{\"reviewerID\": \"AKM1MP6P0OYPR\", \"asin\": \"0132793040\", \"reviewerName\": \"Vicki Gibson \\\"momo4\\\"\", \"helpful\": [1, 1], \"reviewText\": \"Corey Barker does a great job of explaining Blend Modes in this DVD. All of the Kelby training videos are great but pricey to buy individually. If you really want bang for your buck just subscribe to Kelby Training online.\", \"overall\": 5.0, \"summary\": \"Very thorough\", \"unixReviewTime\": 1365811200, \"reviewTime\": \"04 13, 2013\"}\n"
]
}
],
"source": [
"review_filepaths = 'Data/Reviews/*'\n",
"textRDD = sc.textFile(review_filepaths)\n",
"\n",
"print 'number of reviews : {0}'.format(textRDD.count())\n",
"\n",
"print 'sample row : \\n{0}'.format(textRDD.first())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<h1>5. Loading Data into a DataFrame</h1>\n",
"\n",
"A DataFrame requires schema. There are two main functions that can be used to assign schema into an RDD. \n",
"+ Inferring Schema : This functions infers the schema of the RDD by observing it\n",
"+ Applying Schema : This function applies a manually defined schema an RDD"
]
},
{
"cell_type": "code",
"execution_count": 28,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"<pyspark.sql.context.SQLContext object at 0x10951d590>\n"
]
}
],
"source": [
"# You need SQL context do \n",
"from pyspark.sql import SQLContext\n",
"\n",
"# Instantiate SQL Context\n",
"sqc = SQLContext(sc)\n",
"\n",
"print sqc"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<h2>Inferring the Schema Using Reflection</h2>"
]
},
{
"cell_type": "code",
"execution_count": 29,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"Row(asin=u'0132793040', helpful=[1, 1], overall=5.0, reviewText=u'Corey Barker does a great job of explaining Blend Modes in this DVD. All of the Kelby training videos are great but pricey to buy individually. If you really want bang for your buck just subscribe to Kelby Training online.', reviewTime=u'04 13, 2013', reviewerID=u'AKM1MP6P0OYPR', reviewerName=u'Vicki Gibson \"momo4\"', summary=u'Very thorough', unixReviewTime=1365811200)"
]
},
"execution_count": 29,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"inferredDF = sqc.jsonFile(review_filepaths)\n",
"inferredDF.first()"
]
},
{
"cell_type": "code",
"execution_count": 30,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- asin: string (nullable = true)\n",
" |-- helpful: array (nullable = true)\n",
" | |-- element: long (containsNull = true)\n",
" |-- overall: double (nullable = true)\n",
" |-- reviewText: string (nullable = true)\n",
" |-- reviewTime: string (nullable = true)\n",
" |-- reviewerID: string (nullable = true)\n",
" |-- reviewerName: string (nullable = true)\n",
" |-- summary: string (nullable = true)\n",
" |-- unixReviewTime: long (nullable = true)\n",
"\n"
]
}
],
"source": [
"inferredDF.printSchema()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<h2>Manually Specifying the Schema</h2>\n",
"\n",
"The Documentation about different data types can be found at [Spark SQL DataTypes section](https://spark.apache.org/docs/latest/sql-programming-guide.html#data-types \"Spark SQL DataTypes Documentation\") \n",
"\n",
"<h3>EXERCISE (3 mins)</h3>\n",
"+ Let's add the fields \"summary\" ::string and \"unixReviewTime\" ::long to the schema. "
]
},
{
"cell_type": "code",
"execution_count": 31,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"StructType(List(StructField(reviewerID,StringType,true),StructField(asin,StringType,true),StructField(reviewerName,StringType,true),StructField(helpful,ArrayType(IntegerType,true),true),StructField(reviewText,StringType,true),StructField(reviewTime,StringType,true),StructField(overall,DoubleType,true),StructField(summary,StringType,true),StructField(unixReviewTime,LongType,true)))\n"
]
}
],
"source": [
"# Export the modules\n",
"from pyspark.sql.types import *\n",
"\n",
"# Define Schema\n",
"REVIEWS_SCHEMA_DEF = StructType([\n",
" StructField('reviewerID', StringType(), True),\n",
" StructField('asin', StringType(), True),\n",
" StructField('reviewerName', StringType(), True),\n",
" StructField('helpful', ArrayType(\n",
" IntegerType(), True), \n",
" True),\n",
" StructField('reviewText', StringType(), True),\n",
" StructField('reviewTime', StringType(), True),\n",
" StructField('overall', DoubleType(), True),\n",
" StructField('summary', StringType(), True),\n",
" StructField('unixReviewTime', LongType(), True)\n",
" ])\n",
"\n",
"print REVIEWS_SCHEMA_DEF"
]
},
{
"cell_type": "code",
"execution_count": 32,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"Row(reviewerID=u'AKM1MP6P0OYPR', asin=u'0132793040', reviewerName=u'Vicki Gibson \"momo4\"', helpful=[1, 1], reviewText=u'Corey Barker does a great job of explaining Blend Modes in this DVD. All of the Kelby training videos are great but pricey to buy individually. If you really want bang for your buck just subscribe to Kelby Training online.', reviewTime=u'04 13, 2013', overall=5.0, summary=u'Very thorough', unixReviewTime=1365811200)"
]
},
"execution_count": 32,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Using a handcrafted schema with to create a DataFrame\n",
"appliedDF = sqlContext.jsonFile(review_filepaths,schema=REVIEWS_SCHEMA_DEF)\n",
"appliedDF.first()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<h1>6. DataFrame operations</h1>\n",
"\n",
"Spark DataFrame API allow you to do multiple operations on the Data. The primary advantage of using the DataFrame API is that you can do data transoformations with the high level API without having to use Python. Using the high level API has its advantages which will be explained later in the tutorial.\n",
"\n",
"DataFrame API have functionality similar to that of Core RDD API. For example: \n",
"+ map : foreach, Select\n",
"+ mapPartition : foreachPartition\n",
"+ filter : filter\n",
"+ groupByKey, reduceByKey : groupBy \n",
"\n",
"<h2>6.1. Selecting Columns</h2>\n",
"\n",
"You can use SELECT statement to select columns from your dataframe\n",
"\n",
"<h3>EXERCISE (3 mins)</h3>\n",
"\n",
"+ Divide the numerator in 'helpful' field by the denominator in 'helpful' field to get the fraction "
]
},
{
"cell_type": "code",
"execution_count": 33,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"asin overall reviewText helpful reviewerID unixReviewTime\n",
"0132793040 5.0 Corey Barker does... 1.0 AKM1MP6P0OYPR 1365811200 \n",
"0321732944 5.0 While many beginn... null A2CX7LUOHB2NDG 1341100800 \n",
"0439886341 1.0 It never worked. ... 1.0 A2NWSAGRHCP8N5 1367193600 \n",
"0439886341 3.0 Some of the funct... 1.0 A2WNBOD3WNDNKT 1374451200 \n",
"0439886341 1.0 Do not waste your... 1.0 A1GI0U4ZRJA8WN 1334707200 \n",
"0511189877 5.0 Dog got the old r... null A1QGNMC6O1VW39 1397433600 \n",
"0511189877 2.0 This remote, for ... 1.0 A3J3BRHTDRFJ2G 1397433600 \n",
"0511189877 5.0 We had an old Tim... 0.0 A2TY0BTJOTENPG 1395878400 \n",
"0511189877 5.0 This unit works j... null A34ATBPOK6HCHY 1395532800 \n",
"0511189877 5.0 It is an exact du... null A89DO69P0XZ27 1395446400 \n",
"0511189877 5.0 Works on my t.v. ... 0.0 AZYNQZ94U6VDB 1401321600 \n",
"0528881469 5.0 Love it has every... null A1DA3W4GTFXP6O 1405641600 \n",
"0528881469 1.0 I have owned two ... null A29LPQQDG7LD5J 1352073600 \n",
"0528881469 5.0 We got this GPS f... null AO94DHGC771SJ 1370131200 \n",
"0528881469 1.0 I'm a professiona... 0.8 AMO214LNFCEI4 1290643200 \n",
"0528881469 4.0 This is a great t... 0.9545454545454546 A28B1G1MSJ6OO1 1280016000 \n",
"0528881469 3.0 Well, what can I ... 0.9555555555555556 A3N7T0DY83Y4IG 1283990400 \n",
"0528881469 2.0 Not going to writ... 0.9 A1H8PY3QHMQQA0 1290556800 \n",
"0528881469 2.0 My brother is a t... 0.71875 A2CPBQ5W4OGBX 1277078400 \n",
"0528881469 4.0 This unit is a fa... 1.0 A265MKAR2WEH3Y 1294790400 \n"
]
}
],
"source": [
"columnDF = appliedDF.select(appliedDF.asin,\n",
" appliedDF.overall,\n",
" appliedDF.reviewText,\n",
" appliedDF.helpful[0]/appliedDF.helpful[1],\n",
" appliedDF.reviewerID,\n",
" appliedDF.unixReviewTime).\\\n",
" withColumnRenamed('(helpful[0] / helpful[1])','helpful')\n",
"columnDF.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<h2>6.2. Missing Values</h2>\n",
"\n",
"Similar to Pandas, DataFrames come equipped with functions to address missing data.\n",
"+ dropna function: can be used to remove observations with missing values\n",
"+ fillna function: can be used to fill missing values with a default value"
]
},
{
"cell_type": "code",
"execution_count": 34,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"asin overall reviewText helpful reviewerID unixReviewTime\n",
"0132793040 5.0 Corey Barker does... 1.0 AKM1MP6P0OYPR 1365811200 \n",
"0321732944 5.0 While many beginn... 0.0 A2CX7LUOHB2NDG 1341100800 \n",
"0439886341 1.0 It never worked. ... 1.0 A2NWSAGRHCP8N5 1367193600 \n",
"0439886341 3.0 Some of the funct... 1.0 A2WNBOD3WNDNKT 1374451200 \n",
"0439886341 1.0 Do not waste your... 1.0 A1GI0U4ZRJA8WN 1334707200 \n",
"0511189877 5.0 Dog got the old r... 0.0 A1QGNMC6O1VW39 1397433600 \n",
"0511189877 2.0 This remote, for ... 1.0 A3J3BRHTDRFJ2G 1397433600 \n",
"0511189877 5.0 We had an old Tim... 0.0 A2TY0BTJOTENPG 1395878400 \n",
"0511189877 5.0 This unit works j... 0.0 A34ATBPOK6HCHY 1395532800 \n",
"0511189877 5.0 It is an exact du... 0.0 A89DO69P0XZ27 1395446400 \n",
"0511189877 5.0 Works on my t.v. ... 0.0 AZYNQZ94U6VDB 1401321600 \n",
"0528881469 5.0 Love it has every... 0.0 A1DA3W4GTFXP6O 1405641600 \n",
"0528881469 1.0 I have owned two ... 0.0 A29LPQQDG7LD5J 1352073600 \n",
"0528881469 5.0 We got this GPS f... 0.0 AO94DHGC771SJ 1370131200 \n",
"0528881469 1.0 I'm a professiona... 0.8 AMO214LNFCEI4 1290643200 \n",
"0528881469 4.0 This is a great t... 0.9545454545454546 A28B1G1MSJ6OO1 1280016000 \n",
"0528881469 3.0 Well, what can I ... 0.9555555555555556 A3N7T0DY83Y4IG 1283990400 \n",
"0528881469 2.0 Not going to writ... 0.9 A1H8PY3QHMQQA0 1290556800 \n",
"0528881469 2.0 My brother is a t... 0.71875 A2CPBQ5W4OGBX 1277078400 \n",
"0528881469 4.0 This unit is a fa... 1.0 A265MKAR2WEH3Y 1294790400 \n"
]
}
],
"source": [
"# get null observations out\n",
"densedDF=columnDF.dropna(subset=[\"overall\"]).fillna(0.0,subset=[\"helpful\"]) \n",
"densedDF.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<h2>6.3. Filtering rows</h2>\n",
"\n",
"Filtering lets you select rows based on arguments. The implementation pattern is similar to filtering RDDs, But simpler. \n",
"\n",
"<h3>EXERCISE (3 mins)</h3>\n",
"\n",
"+ Let's filter all the reviews that have an overall score greater than or equal to 3.0 "
]
},
{
"cell_type": "code",
"execution_count": 35,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"asin overall reviewText helpful reviewerID unixReviewTime\n",
"0132793040 5.0 Corey Barker does... 1.0 AKM1MP6P0OYPR 1365811200 \n",
"0321732944 5.0 While many beginn... 0.0 A2CX7LUOHB2NDG 1341100800 \n",
"0439886341 3.0 Some of the funct... 1.0 A2WNBOD3WNDNKT 1374451200 \n",
"0511189877 5.0 Dog got the old r... 0.0 A1QGNMC6O1VW39 1397433600 \n",
"0511189877 5.0 We had an old Tim... 0.0 A2TY0BTJOTENPG 1395878400 \n",
"0511189877 5.0 This unit works j... 0.0 A34ATBPOK6HCHY 1395532800 \n",
"0511189877 5.0 It is an exact du... 0.0 A89DO69P0XZ27 1395446400 \n",
"0511189877 5.0 Works on my t.v. ... 0.0 AZYNQZ94U6VDB 1401321600 \n",
"0528881469 5.0 Love it has every... 0.0 A1DA3W4GTFXP6O 1405641600 \n",
"0528881469 5.0 We got this GPS f... 0.0 AO94DHGC771SJ 1370131200 \n",
"0528881469 4.0 This is a great t... 0.9545454545454546 A28B1G1MSJ6OO1 1280016000 \n",
"0528881469 3.0 Well, what can I ... 0.9555555555555556 A3N7T0DY83Y4IG 1283990400 \n",
"0528881469 4.0 This unit is a fa... 1.0 A265MKAR2WEH3Y 1294790400 \n",
"0528881469 5.0 I did a lot of co... 1.0 A37K02NKUIT68K 1293235200 \n",
"0528881469 4.0 I purchased this ... 0.5 A2AW1SSVUIYV9Y 1289001600 \n",
"0528881469 5.0 EXCELLENT. BEST T... 0.7142857142857143 A2AEHUKOV014BP 1284249600 \n",
"0528881469 4.0 Well as one of th... 1.0 A2O8FIJR9EBU56 1278547200 \n",
"0528881469 4.0 Was fast and what... 0.0 AYTBGUX49LF3W 1398470400 \n",
"0528881469 5.0 We had the GPS fo... 0.0 A1E4WG8HRWWK4R 1390867200 \n",
"0528881469 5.0 Back in the old d... 0.5 A2AOEW5UGXFOOQ 1294790400 \n"
]
}
],
"source": [
"filteredDF=densedDF.filter(densedDF.overall>=3)\n",
"filteredDF.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<h2>6.4. Grouping by overall scores</h2>\n",
"\n",
"Grouping is equivalent to the groupByKey in the core RDD API. You can transform the grouped values using a summary action such as:\n",
"+ count\n",
"+ sum\n",
"+ average\n",
"+ max and so on ..."
]
},
{
"cell_type": "code",
"execution_count": 36,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"overall count\n",
"3.0 2128 \n",
"5.0 18503\n",
"4.0 5324 \n"
]
}
],
"source": [
"grouped = filteredDF.groupBy(\"overall\").count()\n",
"grouped.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<h2>6.5. Joining DataFrames together</h2>\n",
"\n",
"You can join two DataFrames together by using a common key column."
]
},
{
"cell_type": "code",
"execution_count": 37,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"u'{\"asin\": \"0000037214\", \"title\": \"Purple Sequin Tiny Dancer Tutu Ballet Dance Fairy Princess Costume Accessory\", \"price\": 6.9900000000000002, \"imUrl\": \"http://ecx.images-amazon.com/images/I/31mCncNuAZL.jpg\", \"related\": {\"also_viewed\": [\"B00JO8II76\", \"B00DGN4R1Q\", \"B00E1YRI4C\"]}, \"salesRank\": {\"Clothing\": 1233557}, \"brand\": \"Big Dreams\", \"categories\": [[\"Clothing, Shoes & Jewelry\", \"Girls\"], [\"Clothing, Shoes & Jewelry\", \"Novelty, Costumes & More\", \"Costumes & Accessories\", \"More Accessories\", \"Kids & Baby\"]]}'"
]
},
"execution_count": 37,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"product_filepaths = 'Data/Products/*'\n",
"productRDD = sc.textFile(product_filepaths)\n",
"productRDD.first()"
]
},
{
"cell_type": "code",
"execution_count": 38,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"asin title price categories \n",
"0000037214 Purple Sequin Tin... 6.99 ArrayBuffer(Array...\n",
"0000032069 Adult Ballet Tutu... 7.89 ArrayBuffer(Array...\n",
"0000031909 Girls Ballet Tutu... 7.0 ArrayBuffer(Array...\n",
"0000032034 Adult Ballet Tutu... 7.87 ArrayBuffer(Array...\n",
"0000031852 Girls Ballet Tutu... 3.17 ArrayBuffer(Array...\n",
"0000032050 Adult Ballet Tutu... 12.85 ArrayBuffer(Array...\n",
"0000031887 Ballet Dress-Up F... 6.79 ArrayBuffer(Array...\n",
"0000031895 Girls Ballet Tutu... 2.99 ArrayBuffer(Array...\n",
"0123456479 SHINING IMAGE HUG... 64.98 ArrayBuffer(Array...\n",
"0132793040 Kelby Training DV... null ArrayBuffer(Array...\n",
"0188477284 Klean Kanteen Cla... null ArrayBuffer(Array...\n",
"0321732944 Kelby Training DV... null ArrayBuffer(Array...\n",
"0439886341 Digital Organizer... 8.15 ArrayBuffer(Array...\n",
"0456844570 RiZ Women's Beaut... null ArrayBuffer(Array...\n",
"0456808574 Lantin White Viso... null ArrayBuffer(Array...\n",
"0456830197 NVC Unisex Light ... null ArrayBuffer(Array...\n",
"0456856293 Kismeth Eyewear C... null ArrayBuffer(Array...\n",
"0456840532 Max-MPH Black - L... null ArrayBuffer(Array...\n",
"0456787283 FX1 Small Adult A... null ArrayBuffer(Array...\n",
"0456838384 Riz Small Unisex ... null ArrayBuffer(Array...\n"
]
}
],
"source": [
"# Load Dataset2 : Amazon Product information\n",
"# First, define Schema for second Dataset\n",
"PRODUCTS_SCHEMA_DEF = StructType([\n",
" StructField('asin', StringType(), True),\n",
" StructField('title', StringType(), True),\n",
" StructField('price', DoubleType(), True),\n",
" StructField('categories', ArrayType(ArrayType(\n",
" StringType(), True),True),True)\n",
" ])\n",
"\n",
"# Load the dataset\n",
"productDF = sqc.jsonFile(product_filepaths,PRODUCTS_SCHEMA_DEF)\n",
"productDF.show()\n",
"# productDF.first()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"*QUESTION*: What do you think will happen if we remove some fields from this schema?\n",
"\n",
"1. The schema fails\n",
"2. The schema works fine\n",
"\n",
"ANSWER??? \n",
"\n",
"Now lets join the two datasets"
]
},
{
"cell_type": "code",
"execution_count": 39,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"25566L"
]
},
"execution_count": 39,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"enrichedReviews = filteredDF.join(productDF, productDF.asin==filteredDF.asin).dropna(subset=\"title\")\n",
"enrichedReviews.count()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"When you join two RDDs, you have to restructure the data into (k,V) pairs where the key is the join key. This may involve two additional map transformations. This is not necessary in DataFrames. "
]
},
{
"cell_type": "code",
"execution_count": 40,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"DataFrame[asin: string, overall: double, reviewText: string, helpful: double, reviewerID: string, unixReviewTime: bigint, asin: string, title: string, price: double, categories: array<array<string>>]"
]
},
"execution_count": 40,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"enrichedReviews"
]
},
{
"cell_type": "code",
"execution_count": 41,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"asin overall reviewText helpful reviewerID unixReviewTime asin title price categories \n",
"9983782030 5.0 I purchased this ... 0.0 A2G6OTE5JOZHOM 1320192000 9983782030 PREMIUM USB Adapt... null ArrayBuffer(Array...\n",
"9983782030 5.0 Item arrived as d... 1.0 A2AEZK0K9CFVE6 1290211200 9983782030 PREMIUM USB Adapt... null ArrayBuffer(Array...\n",
"998498480X 5.0 The charger that ... 0.0 ABB3GDPNS9TVW 1365292800 998498480X Garmin Nuvi 855 S... 3.77 ArrayBuffer(Array...\n",
"998498480X 3.0 I got this charge... 0.0 A32NKAW0Q7A9IQ 1290729600 998498480X Garmin Nuvi 855 S... 3.77 ArrayBuffer(Array...\n",
"998498480X 5.0 Garmin Nuvi 800 s... 0.0 A1GPVV2XWNGSDX 1303776000 998498480X Garmin Nuvi 855 S... 3.77 ArrayBuffer(Array...\n",
"998498480X 4.0 I plan on using m... 0.0 AGZ5E33UY5L7S 1290038400 998498480X Garmin Nuvi 855 S... 3.77 ArrayBuffer(Array...\n",
"998498480X 5.0 Sometimes I need ... 0.0 A2XOVBAMQ8J0YV 1325808000 998498480X Garmin Nuvi 855 S... 3.77 ArrayBuffer(Array...\n",
"998498480X 4.0 Turns on the GPS ... 0.0 A1VCXIYTPCE6W0 1395964800 998498480X Garmin Nuvi 855 S... 3.77 ArrayBuffer(Array...\n",
"998498480X 4.0 This plug works w... 0.0 A13KD5E4V2VPTO 1363996800 998498480X Garmin Nuvi 855 S... 3.77 ArrayBuffer(Array...\n",
"998498480X 4.0 My GPS device did... 1.0 A3AGNJNG7CFVZK 1275782400 998498480X Garmin Nuvi 855 S... 3.77 ArrayBuffer(Array...\n",
"998498480X 5.0 It worked perfect... 0.0 A1B48D0I1M9HBI 1388534400 998498480X Garmin Nuvi 855 S... 3.77 ArrayBuffer(Array...\n",
"998498480X 3.0 Did not seem to m... 0.0 A1996ALUR26ETP 1388275200 998498480X Garmin Nuvi 855 S... 3.77 ArrayBuffer(Array...\n",
"998498480X 5.0 We bought this fo... 0.0 A24JJYOP041KMM 1383436800 998498480X Garmin Nuvi 855 S... 3.77 ArrayBuffer(Array...\n",
"998498480X 5.0 It's a wall wart.... 1.0 A1DOFBSJT3SSKZ 1269561600 998498480X Garmin Nuvi 855 S... 3.77 ArrayBuffer(Array...\n",
"998498480X 5.0 The charger was r... 0.0 ADTXKJ76N9S6W 1272326400 998498480X Garmin Nuvi 855 S... 3.77 ArrayBuffer(Array...\n",
"998498480X 5.0 This GPS is great... 0.0 A3D1ZRIDPWDM1V 1287100800 998498480X Garmin Nuvi 855 S... 3.77 ArrayBuffer(Array...\n",
"998498480X 5.0 I used it once. W... 0.0 A3R2M5ABAYL63P 1291507200 998498480X Garmin Nuvi 855 S... 3.77 ArrayBuffer(Array...\n",
"998498480X 3.0 I really liked th... 0.0 A1QT8GM0WW02Z5 1287878400 998498480X Garmin Nuvi 855 S... 3.77 ArrayBuffer(Array...\n",
"998498480X 5.0 The thing I liked... 0.0 A3RYHP2OYFJ3YC 1397779200 998498480X Garmin Nuvi 855 S... 3.77 ArrayBuffer(Array...\n",
"998498480X 4.0 I really don't ha... 0.0 A261321MIS0YPB 1290729600 998498480X Garmin Nuvi 855 S... 3.77 ArrayBuffer(Array...\n"
]
}
],
"source": [
"enrichedReviews.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<h1>7. Saving your DataFrame</h1> \n",
"\n",
"Now that we have done some operations on the data, we can save the file for later use. Standard data formats are a great way to opening up valuable data to your entire organization. Spark DataFrames can be saved in many different formats including and not limited to JSON, parquet, Hive and etc... "
]
},
{
"cell_type": "code",
"execution_count": 42,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Saved as parquet successfully\n"
]
}
],
"source": [
"try:\n",
" columnDF.saveAsParquetFile('Data/Outputs/reviews_filtered.parquet')\n",
"except:\n",
" pass\n",
"\n",
"print \"Saved as parquet successfully\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<h1>8. Using Spark SQL</h1>\n",
"\n",
"Spark DataFrames also allow you to use Spark SQL to query from Petabytes of data. Spark comes with a SQL like query language which can be used to query from Distributed DataFrames. A key advantage of using Spark SQL is that the [Catelyst query optimizer](https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html \"Catelyst\") under the hood transforms your SQL query to run it most efficiently. \n",
"\n",
"<h2>8.1. Example Queries</h2>\n",
"\n",
"Spark SQL can leverage the same functionality as the DataFrame API provides. In fact, it provides more functionality via SQL capabilities and HQL capabilities that are available to Spark SQL environment. \n",
"\n",
"For the sake of time constrains, I will explain different functions available in Spark SQL environment by using examples that use multiple functions. This will benefit by:\n",
"+ Covering many functions that are possible via spark SQL\n",
"+ Giving an understanding about how to pipe multiple functions together\n"
]
},
{
"cell_type": "code",
"execution_count": 43,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"There are 30000 reviews about 2469 products\n"
]
}
],
"source": [
"# Read the reviews parquet file\n",
"reviewsDF = sqc.parquetFile('Data/Outputs/reviews_filtered.parquet')\n",
"\n",
"# Register the DataFrames to be used in sql\n",
"reviewsDF.registerAsTable(\"reviews\")\n",
"productDF.registerAsTable(\"products\")\n",
"\n",
"print 'There are {0} reviews about {1} products'.format(reviewsDF.count(),productDF.count())"
]
},
{
"cell_type": "code",
"execution_count": 44,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"asin overall reviewText price\n",
"0123456479 5.0 Very simple...I l... 64.98\n",
"0123456479 5.0 This was my gran... 64.98\n",
"0123456479 4.0 I was looking for... 64.98\n",
"0123456479 5.0 I absolutely LOVE... 64.98\n",
"0123456479 5.0 This was a gift f... 64.98\n",
"0123456479 4.0 I love the produc... 64.98\n",
"0123456479 5.0 This was everythi... 64.98\n",
"0123456479 5.0 This jewelry box ... 64.98\n",
"0123456479 5.0 I have ordered th... 64.98\n",
"0123456479 4.0 This is a nice je... 64.98\n",
"0123456479 5.0 The minute I saw ... 64.98\n",
"0123456479 5.0 This jewelry box ... 64.98\n",
"0123456479 4.0 I love pink, and ... 64.98\n",
"0123456479 5.0 This was a great ... 64.98\n",
"0123456479 4.0 This is probably ... 64.98\n",
"0123456479 5.0 Love this Jewelry... 64.98\n",
"0123456479 5.0 I got this jewelr... 64.98\n",
"0123456479 5.0 This products is ... 64.98\n",
"0123456479 3.0 I already own thi... 64.98\n",
"0123456479 5.0 I love this jewel... 64.98\n"
]
}
],
"source": [
"sql_query = \"\"\"SELECT reviews.asin, overall, reviewText, price\n",
" FROM reviews JOIN products ON reviews.asin=products.asin\n",
" WHERE price > 50.00\n",
"\"\"\"\n",
"\n",
"result = sqc.sql(sql_query)\n",
"result.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<h1>Optional: User Defined Functions</h1>\n",
"\n",
"Spark SQL also provides the functionality similar to User Defined Functions (UDF) offering in Hive. Spark uses registerFunction() function to register python functions in SQLContext."
]
},
{
"cell_type": "code",
"execution_count": 45,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"asin reviewText cleaned \n",
"0123456479 Very simple...I l... ArrayBuffer(very ...\n",
"0123456479 This was my gran... ArrayBuffer(this ...\n",
"0123456479 I was looking for... ArrayBuffer(i was...\n",
"0123456479 I absolutely LOVE... ArrayBuffer(i abs...\n",
"0123456479 This was a gift f... ArrayBuffer(this ...\n",
"0123456479 I love the produc... ArrayBuffer(i lov...\n",
"0123456479 This was everythi... ArrayBuffer(this ...\n",
"0123456479 This jewelry box ... ArrayBuffer(this ...\n",
"0123456479 I have ordered th... ArrayBuffer(i hav...\n",
"0123456479 This is a nice je... ArrayBuffer(this ...\n",
"0123456479 The minute I saw ... ArrayBuffer(the m...\n",
"0123456479 This jewelry box ... ArrayBuffer(this ...\n",
"0123456479 I love pink, and ... ArrayBuffer(i lov...\n",
"0123456479 This was a great ... ArrayBuffer(this ...\n",
"0123456479 This is probably ... ArrayBuffer(this ...\n",
"0123456479 Love this Jewelry... ArrayBuffer(love ...\n",
"0123456479 I got this jewelr... ArrayBuffer(i got...\n",
"0123456479 This products is ... ArrayBuffer(this ...\n",
"0123456479 I already own thi... ArrayBuffer(i alr...\n",
"0123456479 I love this jewel... ArrayBuffer(i lov...\n"
]
}
],
"source": [
"import re\n",
"\n",
"def transform_review(review):\n",
" x1 = re.sub('[^0-9a-zA-Z\\s]+','',review)\n",
" return [x1.lower()]\n",
"\n",
"result.registerAsTable(\"result\")\n",
"sqc.registerFunction(\"to_lowercase\", lambda x:transform_review(x),returnType=ArrayType(StringType(), True))\n",
"\n",
"sql_query_transform = \"\"\"SELECT asin, reviewText, to_lowercase(reviewText) as cleaned\n",
" FROM result\n",
"\"\"\"\n",
"\n",
"result_transform = sqc.sql(sql_query_transform)\n",
"result_transform.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<h1>Optional : Mix and Match!!</h1>\n",
"\n",
"You can also mix DataFrames, RDDs and SparkSQL to make it work for you. \n",
"\n",
"<h2>Scenario</h2>\n",
"\n",
"We want to investigate the average rating of reviews in terms of the categories they belong to. In order to do this, we:\n",
"+ query the needed data using DataFrames API\n",
"+ classify the reviews into different categories using core RDD API\n",
"+ query the avearage rating for each category using Spark SQL"
]
},
{
"cell_type": "code",
"execution_count": 46,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"asin review category\n",
"0123456479 Very simple...I l... fashion \n",
"0123456479 This was my gran... fashion \n",
"0123456479 I was looking for... fashion \n",
"0123456479 I absolutely LOVE... fashion \n",
"0123456479 This was a gift f... fashion \n",
"0123456479 I love the produc... fashion \n",
"0123456479 This was everythi... fashion \n",
"0123456479 This jewelry box ... fashion \n",
"0123456479 I have ordered th... fashion \n",
"0123456479 This is a nice je... fashion \n",
"0123456479 The minute I saw ... fashion \n",
"0123456479 This jewelry box ... fashion \n",
"0123456479 I love pink, and ... fashion \n",
"0123456479 This was a great ... sports \n",
"0123456479 This is probably ... fashion \n",
"0123456479 Love this Jewelry... fashion \n",
"0123456479 I got this jewelr... fashion \n",
"0123456479 This products is ... fashion \n",
"0123456479 I already own thi... fashion \n",
"0123456479 I love this jewel... fashion \n"
]
}
],
"source": [
"import sklearn\n",
"import pickle\n",
"\n",
"from pyspark.sql import Row\n",
"\n",
"model = pickle.load(open('Data/classifiers/classifier.pkl', 'r'))\n",
"classifier_b = sc.broadcast(model)\n",
"\n",
"# fashion.map(lambda x: eval(x)['reviewText']).map(lambda x: (x, model_b.value.predict([x])[0])).first()\n",
"classifiedRDD = result_transform.map(lambda row: \n",
" (row.asin,row.reviewText,str(classifier_b.value.predict(row.cleaned)[0]))\n",
" )\n",
"\n",
"classifiedRDD.first()\n",
"\n",
"CLASSIFIED_SCHEMA = StructType([\n",
" StructField('asin', StringType(), True),\n",
" StructField('review', StringType(), True),\n",
" StructField('category', StringType(), True)\n",
" ])\n",
"\n",
"classifiedDF = sqc.createDataFrame(classifiedRDD,CLASSIFIED_SCHEMA)\n",
"\n",
"classifiedDF.show()"
]
},
{
"cell_type": "code",
"execution_count": 47,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"category avgRating \n",
"fashion 4.004614104227351 \n",
"sports 4.284457321452837 \n",
"electronics 3.7669524119042914\n"
]
}
],
"source": [
"classifiedDF.registerAsTable('enrichedReviews')\n",
"\n",
"sql_query_test = \"\"\"SELECT category, avg(overall) as avgRating\n",
" FROM reviews \n",
" JOIN products ON reviews.asin=products.asin \n",
" JOIN enrichedReviews ON products.asin=enrichedReviews.asin\n",
" WHERE price > 50.0\n",
" GROUP BY enrichedReviews.category\n",
"\"\"\"\n",
"\n",
"resultTest = sqc.sql(sql_query_test)\n",
"resultTest.show()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.7"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment