Skip to content

Instantly share code, notes, and snippets.

@mlmiller
Created December 16, 2014 19:25
Show Gist options
  • Save mlmiller/ae37d8e6c929ca068a86 to your computer and use it in GitHub Desktop.
Save mlmiller/ae37d8e6c929ca068a86 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"metadata": {
"name": ""
},
"nbformat": 3,
"nbformat_minor": 0,
"worksheets": [
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We started the notebook via `ipython notebook --profile=pyspark`"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"import pprint\n",
"from pyspark import SparkContext\n",
"from pyspark.sql import SQLContext, Row"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 1
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"sc = SparkContext(\"local\", \"App Name\")\n",
"sqlContext = SQLContext(sc)"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stderr",
"text": [
"14/12/16 13:46:37 WARN Utils: Your hostname, sparktest-2 resolves to a loopback address: 127.0.1.1; using 104.236.159.134 instead (on interface eth0)\n",
"14/12/16 13:46:37 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address\n",
"14/12/16 13:46:38 INFO Slf4jLogger: Slf4jLogger started\n"
]
}
],
"prompt_number": 6
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"sc"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 8,
"text": [
"<pyspark.context.SparkContext at 0x7f109a360390>"
]
}
],
"prompt_number": 8
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"from cloudant import CloudantSpark\n",
"cloudant = CloudantSpark(sc,'https://meritagemike.cloudant.com','yelp_review')"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"{u'update_seq': u'20033-g1AAAAD1eJzLYWBgYMlgTmFQSElKzi9KdUhJstTLTS3KLElMT9VLzskvTUnMK9HLSy3JAapkSmRIsv___39WEgODcBFImyJcm6EpHn1JDkAyqR6mtRdNqwkerXksQJKhAUgBde8Ha_9GvIMhug9AdEMs98kCABhwUEI', u'disk_size': 17539914, u'sizes': {u'active': 10257894, u'external': 13040483, u'file': 17539914}, u'purge_seq': 0, u'doc_count': 20001, u'compact_running': False, u'db_name': u'yelp_review', u'doc_del_count': 0, u'instance_start_time': u'0', u'other': {u'data_size': 13040483}, u'disk_format_version': 5}\n",
"\t--parralelize chunk:\t"
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"0\n",
"\t--parralelize chunk:\t1\n",
"\t--parralelize chunk:\t"
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"2\n",
"\t--parralelize chunk:\t3\n",
"\t--parralelize chunk:\t4\n",
"\t--parralelize chunk:\t"
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"5\n",
"\t--parralelize chunk:\t6\n",
"\t--parralelize chunk:\t7\n",
"\t--parralelize chunk:\t"
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"8\n",
"\t--parralelize chunk:\t9\n",
"\t--parralelize chunk:\t10\n",
"\t--parralelize chunk:\t"
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"11\n",
"\t--parralelize chunk:\t12\n",
"\t--parralelize chunk:\t13\n",
"\t--parralelize chunk:\t14\n",
"\t--parralelize chunk:\t"
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"15\n",
"\t--parralelize chunk:\t16\n",
"\t--parralelize chunk:\t17\n",
"\t--parralelize chunk:\t18\n",
"\t--parralelize chunk:\t19\n",
"Found:\t20001\t documents"
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"\n"
]
}
],
"prompt_number": 10
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"allTypes = cloudant.rdd"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 11
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Now we move into Spark land"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"#Let's take a peek at the first document\n",
"print '\\n----- First Document ----\\n'\n",
"pprint.pprint(allTypes.first(), width=1)\n",
"print '----- First Document ----\\n'"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"\n",
"----- First Document ----\n",
"\n",
"{"
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"u'_id': u'a1671f2fecaaa46989459736674c6ea1',\n",
" u'_rev': u'1-f503277c92cde3da88da7ede397f4d31',\n",
" u'business_id': u'SKLw05kEIlZcpTD5pqma8Q',\n",
" u'date': u'2014-04-16',\n",
" u'review_id': u'dOr5E63wDvGKSpEesNSycw',\n",
" u'stars': 2,\n",
" u'text': u\"Imperial Garden has been getting progressively worse over the last few years. The waitstaff has always been attentive and professional, and the wait time for food is reasonably short. The restaurant itself and the bathrooms are clean and spacious. The ambiance is relaxed and pleasant, and noise levels are comfortably low.\\n\\nBut none of these things can save the mediocre-to-bad food. That is, if you get your food at all. On more than one occasion, they have forgotten to include entire entrees or side dishes when I've picked up take-out. I now double-check all of my orders before leaving. The egg rolls taste like they came out of a box in the freezer. They are greasy and heavy, with very little flavor until you douse them in sweet-and-sour sauce that's so sweet it might as well be syrup. The same problems plague the entirely unremarkable crab rangoon. I've had the hot and sour soup, and disliked the taste quite a bit. Not spicy enough, and far too sour. I usually order the ginger beef stir-fry. There's hardly any ginger flavor, and the sauce is thick and also fairly sweet. If they halved the amount of sugar in everything, it would be a good start. I've had more success with the sizzling black pepper beef, which has a punch of black pepper and a good balance of meat and vegetables. On a recent trip, a friend got the beef lo mein. It had a nice quantity of beef, but an anemic quantity of vegetables. It was also very greasy and watery, and tasted extremely bland. The only reason I go here as often as I do is that my grandmother (the poster child for stereotypical Wisconsin taste buds, bless her heart) loves the three delicacies sizzling rice.\\n\\nI did discover one saving grace: if you dump sriracha on the food, it blends very well with the food and elevates all of the flavors. Too bad they don't have any of it at the restaurant, it could save many a dish. Bottom line: there's much better, more authentic, more balanced, and less heavy Chinese food available in Madison at comparable prices. If you're looking for good Chinese in town, check out Hong Kong Cafe on Regent or Orient House on South Park instead.\",\n",
" u'type': u'review',\n",
" u'user_id': u'AEIKxjduqqu545E3T2lHog',\n",
" u'votes': {u'cool': 1,\n",
" u'funny': 0,\n",
" u'useful': 1}}\n",
"----- First Document ----\n",
"\n"
]
},
{
"output_type": "stream",
"stream": "stderr",
"text": [
"14/12/16 13:47:25 WARN TaskSetManager: Stage 0 contains a task of very large size (907 KB). The maximum recommended task size is 100 KB.\n"
]
}
],
"prompt_number": 12
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"#let's see how many documents are in the rdd\n",
"rdd_doc_count = allTypes.count()\n",
"print 'docs in rdd:\\t', rdd_doc_count"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"docs in rdd:\t20001\n"
]
},
{
"output_type": "stream",
"stream": "stderr",
"text": [
"14/12/16 13:47:39 WARN TaskSetManager: Stage 1 contains a task of very large size (907 KB). The maximum recommended task size is 100 KB.\n"
]
}
],
"prompt_number": 13
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now let's interpret the schema (magic!) and compare to the structure of the first JSON document fetched from Cloudant"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"reviewDocs = sqlContext.inferSchema(allTypes)\n",
"reviewDocs.printSchema()"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"root\n",
" |-- _id: string (nullable = true)\n",
" |-- _rev: string (nullable = true)\n",
" |-- business_id: string (nullable = true)\n",
" |-- date: string (nullable = true)\n",
" |-- review_id: string (nullable = true)\n",
" |-- stars: integer (nullable = true)\n",
" |-- text: string (nullable = true)\n",
" |-- type: string (nullable = true)\n",
" |-- user_id: string (nullable = true)\n",
" |-- votes: map (nullable = true)\n",
" | |-- key: string\n",
" | |-- value: integer (valueContainsNull = true)\n",
"\n"
]
},
{
"output_type": "stream",
"stream": "stderr",
"text": [
"14/12/16 13:47:54 WARN TaskSetManager: Stage 2 contains a task of very large size (907 KB). The maximum recommended task size is 100 KB.\n",
"/root/installs/spark-1.1.1/python/pyspark/sql.py:1039: UserWarning: Using RDD of dict to inferSchema is deprecated,please use pyspark.Row instead\n",
" warnings.warn(\"Using RDD of dict to inferSchema is deprecated,\"\n",
"14/12/16 13:47:54 WARN TaskSetManager: Stage 3 contains a task of very large size (907 KB). The maximum recommended task size is 100 KB.\n"
]
}
],
"prompt_number": 14
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"OK, that's awesome. The interpreted schema matches the structure of the first document"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Now let's register a table and start with some SQL!"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"reviewDocs.registerAsTable(\"reviews\")"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 15
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Note that the registered table name comes in as a string in the sql query. You can also give it a manual schema if you don't trust the inference. Lots of details [in this presentation from Databricks](http://spark-summit.org/wp-content/uploads/2014/07/Easy-json-Data-Manipulation-Yin-Huai.pdf)"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"goodReviews = sqlContext.sql(\"SELECT text FROM reviews WHERE stars>=5\")"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 16
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Note that it's a lazy evaluation, nothing happens until we peek at the results"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"for review in goodReviews.take(10):\n",
" print review"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"Row(text=u'Best Chinese food in the area. Great service and calming atmosphere.')\n",
"Row(text=u'Hands down the best Chinese food in Madison. The chicken chow mein has been a favorite since I was a child. Try the egg rolls and the jasmine or chrysanthemum tea. You really can never go wrong here! The decor is beautiful, and makes you almost feel like you could be in a restaurant in China.')\n",
"Row(text=u'Always consistently delicious! Classic preparations of Americanized Chinese. The cashew chicken is great!')\n",
"Row(text=u'Lettuce wraps 3x better than PF Changs. If Yelp reviewers consider PF changs 3 stars, then this gets 5.\\n\\nJust to clear up for those who want authentic Chinese. Go to ICHIBAN NOW!!!')\n",
"Row(text=u'Yum! Party of 7, everyone ordered their own dish and then shared with all. Started with pot stickers and shrimp ragoon. Grilled pot stickers were really good, could have just eaten those and been happy. Appetizers came with a variety of dipping sauces. Best dishes sampled were crispy orange beef, almond chicken and generals lamb. Lamb was way tender, excellent choice. Service was prompt and friendly, great atmosphere. Ended meal with delicious mango ice cream. Left happy and very full!')\n",
"Row(text=u'Grilled potstickers, moo shu pork, cashew chicken. Yum!!')\n",
"Row(text=u'I think some of the best Chinese food in Madison. A little pricey but a few dishes always feeds me for at least two additional meals.')\n",
"Row(text=u\"We visit family in Madison at least twice a year and always make sure to have dinner here. The shrimp sizzling rice soup is absolutely to die for, and every entree I've had has been fabulous. Hands down the best Chinese food I have ever had.\")\n",
"Row(text=u\"This place is wonderful. One, Almost anything on the menu can be made gluten free. Two, They have about 7 different types of loose teas to choose from. Three, The food is GREAT!\\n\\nMy favorite dishes are the Three Delicacies Sizzling Rice Soup and the Tofu Lettuce Wraps. I am not lying when I say that I have NEVER had a dish I didn't like here. The food is never greasy, never oversauced and never overcooked. The seafood and veggies always have a really fresh taste... I seriously have no complaints at all.\\n\\nI miss you.\")\n",
"Row(text=u\"Crandalls has moved to state street, and is now serving authentic Peruvian food! We went there for dinner tonight, and it was fabulous. I highly recommend the cheese empanadas, solterito (fava bean and corn salad), and the chaufa de quinoa (like fried rice but with quinoa)! They make chicha morada, which is a drink made from purple corn, and is super refreshing.\\n\\nCrandalls is owned by a very sweet family from Peru, and I'm so happy to see that they are now showcasing their roots with their new restaurant!!\\n\\nThey do still have their famous fish fry, and it looked like the lunch menu has cheese curds and sandwiches as well as ceviche.\")\n"
]
},
{
"output_type": "stream",
"stream": "stderr",
"text": [
"14/12/16 13:51:07 WARN TaskSetManager: Stage 4 contains a task of very large size (907 KB). The maximum recommended task size is 100 KB.\n"
]
}
],
"prompt_number": 17
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Summary \n",
"Let's review what we've accomplished. \n",
"\n",
"1. Created a python module that we can use to import doucments from python into spark rdds.\n",
"2. Successfully performed schema discovery on those documents\n",
"3. Registered the schematized documents as a \"table\" in Spark's SQLContext\n",
"4. Performed a successful SQL query on our dataset\n",
"\n",
"# Todo\n",
"\n",
"1. Using the tricks from the Safari Spark book [chapter 4](https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html), de-duplicate the document collection to make sure that we have only the most recent revision for every unique document ID. There's a standard pattern for this in spark.\n",
"2. Extend the logic around schema discovery to work for collections of documents with more than one type. Where possible, we will filter them on doc.type into separate rdds. Where that doesn't work, we could use Spark's MLib to perform basic clustering on type to discover the total number of schemas and then filter appropriately."
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [],
"language": "python",
"metadata": {},
"outputs": []
}
],
"metadata": {}
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment