Skip to content

Instantly share code, notes, and snippets.

@codeaperature
Created March 24, 2016 00:32
Show Gist options
  • Save codeaperature/a91de0360dbac89cbb19 to your computer and use it in GitHub Desktop.
Save codeaperature/a91de0360dbac89cbb19 to your computer and use it in GitHub Desktop.
A Simple Intro & Tutorial for Spark: Users Logins Example
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# A Simple Intro & Tutorial for Spark: Users Logins Example\n",
"### Stephan Warren - stephanwarren at ymail\n",
"Here are some examples of how to use Spark to examine successful logins from a simple log. The goal is to get a light idea of what Spark can do. \n",
"\n",
"Pre-requisite: Prior knowledge of what a Resilient Distributed Dataset (RDD) means. A good reference article by Vishnu Viswanath is available at http://datalakes.com/blogs/rdds-simplified/ \n",
"\n",
"\n",
"### The Data Set\n",
"\n",
"Since Spark is to be used, let's start by \"paralellize-ing\" our data. This means data will be submitted, as a list, to a \"spark context\" for manipulations. More specifically, the data will be distributed across spark nodes as an RDD. \n",
"\n",
"Note that a 'case case' helps to identify each login as a data point; a data point is a simple login item."
]
},
{
"cell_type": "code",
"execution_count": 196,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"case class Login (loginId: Long, userId: Long, loginDate: String, name: String)\n",
"val allLogins = sc.parallelize(List(\n",
" Login( 0, 50, \"2016-02-01\", \"Joe Apple\"),\n",
" Login( 1, 50, \"2016-02-01\", \"Joe Apple\"),\n",
" Login( 2, 55, \"2016-02-01\", \"Jane Apple\"),\n",
" Login( 3, 60, \"2016-02-01\", \"Bob Banana\"),\n",
" Login( 4, 50, \"2016-02-02\", \"Joe Apple\"),\n",
" Login( 5, 55, \"2016-02-02\", \"Jane Apple\"),\n",
" Login( 6, 65, \"2016-02-02\", \"Pete Pear\"),\n",
" Login( 7, 50, \"2016-02-03\", \"Joe Apple\"),\n",
" Login( 8, 55, \"2016-02-03\", \"Jane Apple\"),\n",
" Login( 9, 70, \"2016-02-03\", \"Candy Coconut\"),\n",
" Login(10, 55, \"2016-02-03\", \"Jane Apple\"),\n",
" Login(11, 60, \"2016-02-03\", \"Bob Banana\"),\n",
" Login(12, 65, \"2016-02-03\", \"Pete Pear\"),\n",
" Login(13, 70, \"2016-02-03\", \"Candy Coconut\"),\n",
" Login(14, 70, \"2016-02-04\", \"Candy Coconut\")\n",
"))\n",
"\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"It can be noted that there are other several ways to get paralleized RDDs. The spark context can load a directory's set of text files line-by-line with sc.textFile() or the whole file (not line-by-line) with sc.wholeTextFile(). Another means for loading data, from sequenced data, into an RDD is to use sequenceFile(). \n",
"\n",
"\n",
"\n",
"## Warm Up Exercises\n",
"\n",
"There are a few simple methods to help explore data. Here are some of these methods."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### How many total logins are there?\n",
"\n",
"The count method shows how many login items are in this list. Sanity check passes."
]
},
{
"cell_type": "code",
"execution_count": 197,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"15"
]
},
"execution_count": 197,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"allLogins.count"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"### What are the first data points?\n",
"\n",
"The take method shows pulls the 1st 3 items in this list. "
]
},
{
"cell_type": "code",
"execution_count": 198,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"Array(Login(0,50,2016-02-01,Joe Apple), Login(1,50,2016-02-01,Joe Apple), Login(2,55,2016-02-01,Jane Apple))"
]
},
"execution_count": 198,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"allLogins.take(3)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### What are (all) the data points?\n",
"\n",
"The collect method shows pulls the all the items in this list. The Spark Shell may only display the first 20 items ... so it's not always possible to see all the data."
]
},
{
"cell_type": "code",
"execution_count": 199,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"Array(Login(0,50,2016-02-01,Joe Apple), Login(1,50,2016-02-01,Joe Apple), Login(2,55,2016-02-01,Jane Apple), Login(3,60,2016-02-01,Bob Banana), Login(4,50,2016-02-02,Joe Apple), Login(5,55,2016-02-02,Jane Apple), Login(6,65,2016-02-02,Pete Pear), Login(7,50,2016-02-03,Joe Apple), Login(8,55,2016-02-03,Jane Apple), Login(9,70,2016-02-03,Candy Coconut), Login(10,55,2016-02-03,Jane Apple), Login(11,60,2016-02-03,Bob Banana), Login(12,65,2016-02-03,Pete Pear), Login(13,70,2016-02-03,Candy Coconut), Login(14,70,2016-02-04,Candy Coconut))"
]
},
"execution_count": 199,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"allLogins.collect"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### How can a sample of the data points be made?\n",
"\n",
"The sample method pulls a sampling of login items are in this list. The method may resample previously sampled items. The first parameter, true, means overwrite / remove duplicate samples. The latter parameter is the ratio of samples compared to the full data size. Note: A bit easier on the eyes, methos can be moved to cover multiple lines. "
]
},
{
"cell_type": "code",
"execution_count": 200,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"Array(Login(1,50,2016-02-01,Joe Apple), Login(6,65,2016-02-02,Pete Pear), Login(8,55,2016-02-03,Jane Apple))"
]
},
"execution_count": 200,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"allLogins.\n",
" sample(true, 0.3).\n",
" collect"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### How can each login be printed?\n",
"\n",
"The println method helps print list of logins. Is this really a Spark context function? "
]
},
{
"cell_type": "code",
"execution_count": 201,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Login(0,50,2016-02-01,Joe Apple)\n",
"Login(1,50,2016-02-01,Joe Apple)\n",
"Login(2,55,2016-02-01,Jane Apple)\n",
"Login(3,60,2016-02-01,Bob Banana)\n",
"Login(4,50,2016-02-02,Joe Apple)\n",
"Login(5,55,2016-02-02,Jane Apple)\n",
"Login(6,65,2016-02-02,Pete Pear)\n",
"Login(7,50,2016-02-03,Joe Apple)\n",
"Login(8,55,2016-02-03,Jane Apple)\n",
"Login(9,70,2016-02-03,Candy Coconut)\n",
"Login(10,55,2016-02-03,Jane Apple)\n",
"Login(11,60,2016-02-03,Bob Banana)\n",
"Login(12,65,2016-02-03,Pete Pear)\n",
"Login(13,70,2016-02-03,Candy Coconut)\n",
"Login(14,70,2016-02-04,Candy Coconut)\n"
]
}
],
"source": [
"allLogins.\n",
" collect.\n",
" foreach(println)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Is there a way to sort login names?\n",
"\n",
"The sortby method sorts logins. Using the underscore charcter means take the default item. (This is a Scala syntax 'trick'.)"
]
},
{
"cell_type": "code",
"execution_count": 202,
"metadata": {
"collapsed": false,
"scrolled": false
},
"outputs": [
{
"data": {
"text/plain": [
"Array(Login(3,60,2016-02-01,Bob Banana), Login(11,60,2016-02-03,Bob Banana), Login(9,70,2016-02-03,Candy Coconut), Login(13,70,2016-02-03,Candy Coconut), Login(14,70,2016-02-04,Candy Coconut), Login(2,55,2016-02-01,Jane Apple), Login(5,55,2016-02-02,Jane Apple), Login(8,55,2016-02-03,Jane Apple), Login(10,55,2016-02-03,Jane Apple), Login(0,50,2016-02-01,Joe Apple), Login(1,50,2016-02-01,Joe Apple), Login(4,50,2016-02-02,Joe Apple), Login(7,50,2016-02-03,Joe Apple), Login(6,65,2016-02-02,Pete Pear), Login(12,65,2016-02-03,Pete Pear))"
]
},
"execution_count": 202,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"allLogins.\n",
" sortBy(_.name, true).\n",
" collect"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"Sidebar: The following is the same effective code using a longer explicit syntax version (without the underscore character)."
]
},
{
"cell_type": "code",
"execution_count": 203,
"metadata": {
"collapsed": false,
"scrolled": false
},
"outputs": [
{
"data": {
"text/plain": [
"Array(Login(3,60,2016-02-01,Bob Banana), Login(11,60,2016-02-03,Bob Banana), Login(9,70,2016-02-03,Candy Coconut), Login(13,70,2016-02-03,Candy Coconut), Login(14,70,2016-02-04,Candy Coconut), Login(2,55,2016-02-01,Jane Apple), Login(5,55,2016-02-02,Jane Apple), Login(8,55,2016-02-03,Jane Apple), Login(10,55,2016-02-03,Jane Apple), Login(0,50,2016-02-01,Joe Apple), Login(1,50,2016-02-01,Joe Apple), Login(4,50,2016-02-02,Joe Apple), Login(7,50,2016-02-03,Joe Apple), Login(6,65,2016-02-02,Pete Pear), Login(12,65,2016-02-03,Pete Pear))"
]
},
"execution_count": 203,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"allLogins.\n",
" sortBy(login => login.name, true).\n",
" collect"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Sidebar (continued): Which way is better? Sometimes it's better to understand code, do shortcuts or explicit syntax solve that issue?\n",
"\n",
"\n",
"### The last example is cluttered -- Is there a way to to only see login names?\n",
"\n",
"The map method allows the adding or removal of login data to each data point. Putting the map together with print is useful for formatting the data."
]
},
{
"cell_type": "code",
"execution_count": 204,
"metadata": {
"collapsed": false,
"scrolled": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Bob Banana\n",
"Bob Banana\n",
"Candy Coconut\n",
"Candy Coconut\n",
"Candy Coconut\n",
"Jane Apple\n",
"Jane Apple\n",
"Jane Apple\n",
"Jane Apple\n",
"Joe Apple\n",
"Joe Apple\n",
"Joe Apple\n",
"Joe Apple\n",
"Pete Pear\n",
"Pete Pear\n"
]
}
],
"source": [
"allLogins.\n",
" sortBy(_.name, true).\n",
" map(_.name).\n",
" collect.\n",
" foreach(println)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### That's still cluttered with duplicates -- Is there a way to to only see unique login names?\n",
"\n",
"The distinct method allows the remove duplicated data. Here is a bit of a cleaner approach."
]
},
{
"cell_type": "code",
"execution_count": 205,
"metadata": {
"collapsed": false,
"scrolled": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Candy Coconut\n",
"Joe Apple\n",
"Pete Pear\n",
"Bob Banana\n",
"Jane Apple\n"
]
}
],
"source": [
"allLogins.\n",
" sortBy(_.name, true).\n",
" map(_.name).\n",
" distinct.\n",
" collect.\n",
" foreach(println)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
"\n",
"\n",
"## Putting it Together: Getting Some Useful Login Information\n",
"\n",
"Now that some preliminary items have been covered, it's time to take a deeper look at processing the Spark example login data."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Which logins occurred on 2106-02-01?\n",
"\n",
"The filter method will selects wanted (or drops unwanted) items from the RDD. Here, filter is applied to keep logins on the date of interest. Collect will gather lazy RDD data by forcing an operation. "
]
},
{
"cell_type": "code",
"execution_count": 206,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"Array(Login(0,50,2016-02-01,Joe Apple), Login(1,50,2016-02-01,Joe Apple), Login(2,55,2016-02-01,Jane Apple), Login(3,60,2016-02-01,Bob Banana))"
]
},
"execution_count": 206,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"allLogins.\n",
" filter(_.loginDate == \"2016-02-01\").\n",
" collect"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Which users, by name, logged-in at least once on 2106-02-01?\n",
"The map methods, as seen before, will change, or transform, each data point. In the following, items such as userId, loginID, loginDate are dropped from each data point and only name is maintained. By using distinct, only unique names are maintained; duplicates are dropped. "
]
},
{
"cell_type": "code",
"execution_count": 207,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"Array(Jane Apple, Bob Banana, Joe Apple)"
]
},
"execution_count": 207,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"allLogins.\n",
" filter(_.loginDate == \"2016-02-01\").\n",
" map(_.name).\n",
" distinct.\n",
" collect"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Which users, by ID, logged-in at least once on 2106-02-01?\n",
"\n",
"The same techique can be used to find userIDs. In this example, the user names cannot be guaranteed to be unique. Therefore, using the user IDs makes more sense than using the user names. Surely, there are several persons named 'Bob Banana' logging in. :)"
]
},
{
"cell_type": "code",
"execution_count": 208,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"Array(50, 60, 55)"
]
},
"execution_count": 208,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"allLogins.\n",
" filter(_.loginDate == \"2016-02-01\").\n",
" map(_.userId).\n",
" distinct.\n",
" collect"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Which users logged-in at least once between 2106-02-01 and 2016-02-03?\n",
"\n",
"An simple change to the filter method allows 3 dates to remain in the RDD. Fortunatey the date format is easy to use in this case. "
]
},
{
"cell_type": "code",
"execution_count": 209,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"Array(65, 50, 60, 70, 55)"
]
},
"execution_count": 209,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"allLogins.\n",
" filter(login => (\"2016-02-01\" <= login.loginDate) && (login.loginDate <= \"2016-02-03\")).\n",
" map(_.userId).\n",
" distinct.\n",
" collect"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Which users logged-in at least once per day between 2106-02-01 and 2016-02-03 and on what day? \n",
"\n",
"It would be nice to know the dates as well. Changing the map method to emit a tuple helps show the user and the date."
]
},
{
"cell_type": "code",
"execution_count": 210,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"Array((50,2016-02-03), (60,2016-02-03), (65,2016-02-03), (55,2016-02-01), (65,2016-02-02), (50,2016-02-01), (70,2016-02-03), (60,2016-02-01), (55,2016-02-03), (50,2016-02-02), (55,2016-02-02))"
]
},
"execution_count": 210,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"allLogins.\n",
" filter(login => (\"2016-02-01\" <= login.loginDate) && (login.loginDate <= \"2016-02-03\")).\n",
" map(login => (login.userId, login.loginDate)).\n",
" distinct.\n",
" collect"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Which users logged in at least once per day between 2106-02-01 and 2016-02-03?\n",
"\n",
"Changing the map to (1) remove the date, leaving just the user ID, and (2) emit a '1' provides a list of users (by IDs) that have logged in at least once on any of those three days. (The '1' is more like a true that the user logged in at least once and isn't really how many times the user logged in each day.) The produced list is very similar to the list previous step with the dates removed and the '1' added. This step is just an interim step. (See the following step.)"
]
},
{
"cell_type": "code",
"execution_count": 211,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"Array((50,1), (60,1), (65,1), (55,1), (65,1), (50,1), (70,1), (60,1), (55,1), (50,1), (55,1))"
]
},
"execution_count": 211,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"allLogins.\n",
" filter(login => (\"2016-02-01\" <= login.loginDate) && (login.loginDate <= \"2016-02-03\")).\n",
" map(login => (login.userId, login.loginDate)).\n",
" distinct.\n",
" map(x => (x._1, 1)).\n",
" collect"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### How many days did each users log in at least once per day between 2106-02-01 and 2016-02-03?\n",
"\n",
"Building on the step above, use reduceByKey to add each day logged-in for each user(by ID)."
]
},
{
"cell_type": "code",
"execution_count": 212,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"Array((65,2), (50,3), (60,2), (70,1), (55,3))"
]
},
"execution_count": 212,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"allLogins.\n",
" filter(login => (\"2016-02-01\" <= login.loginDate) && (login.loginDate <= \"2016-02-03\")).\n",
" map(login => (login.userId, login.loginDate)).\n",
" distinct.\n",
" map(x => (x._1, 1)).\n",
" reduceByKey(_+_).\n",
" collect"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### How many users logged-in at least once every day between 2106-02-01 and 2016-02-03?\n",
"\n",
"Ok ... Since the requirement is for 3 days of logging in, use the filter method to look for '3' and count how many users met this criteria."
]
},
{
"cell_type": "code",
"execution_count": 213,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"2"
]
},
"execution_count": 213,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"allLogins.\n",
" filter(login => (\"2016-02-01\" <= login.loginDate) && (login.loginDate <= \"2016-02-03\")).\n",
" map(login => (login.userId, login.loginDate)).\n",
" distinct.\n",
" map(x => (x._1, 1)).\n",
" reduceByKey(_+_).\n",
" filter(_._2 == 3).\n",
" count\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Summary\n",
"\n",
"I hope the this example and information are a helpful introduction to Spark. Spark has many more features -- like streaming, machine learning, data frames (an alternative to Hive & Pig) and Graphx.\n",
"\n",
"### Further Reading:\n",
"\n",
"There is a part 2 of Vishnu's RDD blog that provides insights on how some of the methods shown above are carried out in RDDs. This can be found at: https://www.linkedin.com/pulse/spark-rdds-simplified-part-2-vishnu-viswanath\n",
"\n",
"Various articles exist on the web. One book that is rather easy to follow is Big Data Analysis with Spark by Mohammed Guller.\n",
"\n",
"### Coursework:\n",
"Some courses exist on Spark and Big Data. The number of courses and disparate content make finding courses difficult. I found the Galvanize (formerly Zipfian Academy) Data Engineering course helpful in my quest.\n",
"\n",
"### How Did I Make This Document?\n",
"\n",
"I used Asim Jalis's blog, https://github.com/asimjalis/apache-toree-quickstart, to install Toree and write this article.\n",
"\n",
"\n",
"### Contact:\n",
"You can contact me at stephanwarren on ymail.com\n",
"\n"
]
}
],
"metadata": {
"celltoolbar": "Raw Cell Format",
"kernelspec": {
"display_name": "Toree",
"language": "",
"name": "toree"
},
"language_info": {
"name": "scala"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment