Skip to content

Instantly share code, notes, and snippets.

@pramodvspk
Last active October 8, 2016 19:03
Show Gist options
  • Save pramodvspk/f2c669b1ba7554a72a631aeb8ddb4ee8 to your computer and use it in GitHub Desktop.
Save pramodvspk/f2c669b1ba7554a72a631aeb8ddb4ee8 to your computer and use it in GitHub Desktop.
Analysis of streaming music data using Apache Spark
Display the source blob
Display the rendered blob
Raw
{"cells":[{"cell_type":"markdown","source":["# Music Analysis using Apache Spark \nIn this notebook, I will be demonstrating various concepts of Apache Spark such as transformations and actions. I will be executing the examples using various API's present in Apache Spark such as RDD's and DataFrames.\n\nThis example has been insipred from <a href=\"https://www.mapr.com/blog/real-time-user-profiles-spark-drill-and-mapr-db\"> this awesome blog post</a> on MapR Academy blog. The data has been acquired from the <a href=\"https://github.com/mapr/mapr-demos/tree/master/spark_music_demo\">Github account</a> of MapR academy.\n\n### Problem Statement\nIn a hypothetical music streaming website, customers are constantly connected to the service and are listening to the tracks. The data caputured by these events forms the core dataset of our problem. There are various attrubutes associated with the customers and the tracks. \n\n### Datasets\n\n1. <a href=\"https://raw.githubusercontent.com/mapr/mapr-demos/master/spark_music_demo/tracks.csv\"><b>tracks.csv</b></a> - Contains the collection of the data where the customer is listening to a track, one event per line. The various attributes present in the dataset are\n 1. Event ID : The unique identifier of the event where the customer is listening to the track (Integer)\n 2. Customer ID : The customer Id of the customer listening to the track (Integer)\n 3. Track ID : The track Id of the track currently being played (Integer)\n 4. DateTime : The date and tim, the customer is listening to the track (String)\n 5. Mobile : 1, if the customer is listening to the track on a mobile device else 0 (Integer)\n 6. Listening ZIP : The approximate Zip location of the customer listening to the track (Integer)\n2. <a href=\"https://raw.githubusercontent.com/mapr/mapr-demos/master/spark_music_demo/cust.csv\"> <b>cust.csv</b></a> - Contains the details about the customer\n 1. Customer ID: The unique identifier of the customer (Integer)\n 2. Name, Gender, Address, Zip : The information associated with the customer (String, Integer, String, Integer)\n 3. Sign Date : The date the customer has been added to the service (String)\n 4. Level : The level of subscription of the customer 0, 1, 2 for Free, Silver and Gold, respectively (Integer)\n 5. Other fields: Which we are not interested in this example"],"metadata":{}},{"cell_type":"markdown","source":["### Downloading the datasets\n\nDownload the datasets using the shell command wget and the URL, save them into the tmp directory. The URL's for the datasets are\n1. tracks.csv : https://raw.githubusercontent.com/mapr/mapr-demos/master/spark_music_demo/tracks.csv\n2. cust.csv : https://raw.githubusercontent.com/mapr/mapr-demos/master/spark_music_demo/cust.csv"],"metadata":{}},{"cell_type":"code","source":["%sh\nwget -P /tmp \"https://raw.githubusercontent.com/mapr/mapr-demos/master/spark_music_demo/tracks.csv\"\nwget -P /tmp \"https://raw.githubusercontent.com/mapr/mapr-demos/master/spark_music_demo/cust.csv\""],"metadata":{},"outputs":[],"execution_count":3},{"cell_type":"markdown","source":["### Uploading the datasets into Databricks file system\n\nDatabricks file system is a distributed file system lying on top of Amazon S3. We will upload the data from the local file system into our DBFS. Below is a python script which copies the data from the local file system into the datasets folder of DBFS of your cluster.\n\nNote: The local files are referenced using `file:/` and DBFS files are referenced using `dbfs:/`"],"metadata":{}},{"cell_type":"code","source":["localTrackFilePath = \"file:/tmp/tracks.csv\"\nlocalCustomerFilePath = \"file:/tmp/cust.csv\"\ndbutils.fs.mkdirs(\"dbfs:/datasets\")\ndbutils.fs.cp(localTrackFilePath, \"dbfs:/datasets/\")\ndbutils.fs.cp(localCustomerFilePath, \"dbfs:/datasets\")\n#Displaying the files present in the DBFS datasets folder of your cluser\ndisplay(dbutils.fs.ls(\"dbfs:/datasets\"))"],"metadata":{},"outputs":[],"execution_count":5},{"cell_type":"markdown","source":["### Creating named tuples in Python\n\nThe downloaded datasets are in CSV format and the row's in a CSV file have no inherent structure. Named tuples in python are light weight immutable objects. Named tuple instances can be referenced using object like variable deferencing or the standard tuple syntax. This makes the code more redeable and more pythonic\n\nBelow is the creation of named tuples for the Track dataset."],"metadata":{}},{"cell_type":"code","source":["from collections import namedtuple\n\ntrack_fields = ('event_id', 'customer_id', 'track_id', 'date_time', 'mobile', 'zip')\nTrack = namedtuple('Track', track_fields, verbose=True)"],"metadata":{},"outputs":[],"execution_count":7},{"cell_type":"markdown","source":["### Defining the problem statement\n\nWe would be performing analysis on the music dataset using the following questions\n1. Number of unique tracks listened by a user.\n2. Number of mobile tracks listened by a user.\n3. Percentage of tracks listened by the users during different times of the day.\n4. Most popular track and its name.\n\nThe analysis would be done by Apache Spark actions and transformations. Actions and transformations such as `map`, `filter`, `reduceByKey`, `reduce`, `collect` etc.. would be covered in the example."],"metadata":{}},{"cell_type":"markdown","source":["#### Loading and creating the first RDD\n\n1. Load the data into a RDD using the spark context `textFile` method.\n2. Define python method which parses the induvidual track.csv row and converts it into a named tuple.\n3. Peform a map transformation, which transforms the tracksRDD into RDD, containing `Track` named tuples.\n4. Persist the RDD in memory, using `persist` method since it will be used in the coming examples."],"metadata":{}},{"cell_type":"code","source":["#Method to parse the tracks and storing it as a Track Named Tuple\ndef parse_tracks(row):\n track_row = row.split(\",\")\n event_id = int(track_row[0])\n customer_id = int(track_row[1])\n track_id = int(track_row[2])\n date_time = str(track_row[3])\n mobile = int(track_row[4])\n zip_code = str(track_row[5])\n return Track(event_id,customer_id,track_id,date_time,mobile,zip_code)\n\n#Loading the tracks.csv file using the spark context \ntracks_RDD = sc.textFile(\"dbfs:/datasets/tracks.csv\")\n#Parsing the tracks into named tuples and persisting the tracks\ntracks_parsed = tracks_RDD.map(parse_tracks).persist()"],"metadata":{},"outputs":[],"execution_count":10},{"cell_type":"markdown","source":["### 1. Number of unique tracks listened by a user\n\n1. `map` through the tracks_parsed RDD and create a pair RDD with Customer ID as key and a list containing Track ID as the value.\n2. Apply a `reduceByKey` operation and combine all the Track Id's listened by a customer into a list.\n3. Map through the values of pair RDD, convert the list of all Track's listened by the user into a set, which automatically removes the duplicates.\n4. Map through the values of the resulting pair RDD, find the length of the set of all Track's listened by a user.\n5. Apply action `take(10)` which kicks of the creation of all the RDD's created using the above transformations."],"metadata":{}},{"cell_type":"code","source":["tracks_pairRDD = tracks_parsed.map(lambda track: (track.customer_id,[track.track_id]))\ntracks_by_userRDD = tracks_pairRDD.reduceByKey(lambda a,b: a+b)\nunique_tracksRDD = tracks_by_userRDD.mapValues(lambda list_tracks: set(list_tracks))\nnumber_unique_tracksRDD = unique_tracksRDD.mapValues(lambda set_tracks: len(set_tracks))\n#Printing the number of tracks listened by 10 users, it can be done for all users similarly by collect() action \nfor customer_unique in number_unique_tracksRDD.take(10):\n print \"Customer Id:\", customer_unique[0], \"has listened to\", customer_unique[1], \"unique tracks\""],"metadata":{},"outputs":[],"execution_count":12},{"cell_type":"markdown","source":["#### Displaying the customer names along with their number unique tracks\n\nUsing transformations and actions, above we were able to generate the number of unique tracks listened by each customer ID, but what if we want to know the name associated with a customer ID?\n\nCreate a dictionary which maps customerID's to customer Names. But the dictionary resides locally on the driver program and the tasks are executed on the worker nodes. So the dictionary needs to be shipped to the worker nodes along with the tasks which is very expensive computationally. So we use broadcast variables.\n\nAbout broadcast variables from <a href=\"http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables\">official documentation</a>\n> Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.Explicitly creating broadcast variables is only useful when tasks across multiple stages need the same data or when caching the data in deserialized form is important.\n\nSo, we start by loading the cust.csv name from the tmp folder of our local file system, create a dictionary mapping customer ID's to customer names and create a broadcast variable using the sparkcontext's `broadcast` method. The value of the broadcast variable can be accessed using the `value` method. \n\nAfter creating the broadcast variable, perform the `take(10)` action on the number_unique_tracksRDD, and print the Customer Name using the `value` method"],"metadata":{}},{"cell_type":"code","source":["#Method to create a dictionary which maps Customer ID's to Customer names\ndef create_cust_dict():\n custDict = {}\n with open(\"/tmp/cust.csv\") as f:\n for line in f:\n fields = line.split(\",\")\n if(fields[0]!=\"CustID\"):\n custId = int(fields[0])\n cust_name = str(fields[1])\n custDict[custId] = cust_name\n return custDict\n\n# Creating a broadcast variable cust_names from the dictionary returned by the create_cust_dict method above\ncust_names = sc.broadcast(create_cust_dict())\n\n#Printing the Customer Names using the broadcast variable value method\nfor customer_unique in number_unique_tracksRDD.take(10):\n print cust_names.value[customer_unique[0]], \"has listened to\", customer_unique[1], \"unique tracks\""],"metadata":{},"outputs":[],"execution_count":14},{"cell_type":"markdown","source":["### 2. Total number of mobile tracks listened by a customer\n\n1. Apply a filter transformation on the tracks_parsed RDD, to filter out the tracks which are not listened using mobile.\n2. Create a pair RDD which contains the Customer ID as the key and 1 as the value.\n3. Apply `reduceByKey` transformation and find the sum of all mobile tracks listened by the user.\n4. Apply `take(10)` action and using the broadcast variables print the names and number of mobile tracks listened by the customers."],"metadata":{}},{"cell_type":"code","source":["filtered_mobile_RDD = tracks_parsed.filter(lambda track: track.mobile == 0)\nfiltered_pair_RDD = filtered_mobile_RDD.map(lambda track: (track.customer_id, 1))\nmobile_tracks_by_customer = filtered_pair_RDD.reduceByKey(lambda a,b: a+b)\n#Printing the customer Id, name and the number of mobile tracks listened by a customer\nfor mobile_track in mobile_tracks_by_customer.take(10):\n print \"Customer Id:\", mobile_track[0], \"named:\", cust_names.value[mobile_track[0]], \"has listened to\", mobile_track[1], \"mobile tracks\""],"metadata":{},"outputs":[],"execution_count":16},{"cell_type":"markdown","source":["### 2. Total number of mobile tracks listened by a customer using Spark SQL\n\nSparkSQL allows creation of dataframes which have a schema associated with them. After the creation of a dataframe it can be registered as a temporary table. Using the sqlContext sql method, SQL queries can be executed on the temporary table. Spark SQL Catalyst optimizer which optimizes the queries.\n\nUsing SparkSQL in our example context\n1. Create a dataframe from the `tracks_parsed` RDD using the `createDataFrame` method.\n2. Register the dataframe as a temporary table using the `registerTempTable` method.\n3. Execute the query using `sqlContext.sql` method, which returns the result as a dataframe.\n4. Apply `take(10)` action and using the broadcast variables print the names and number of mobile tracks listened by the customers."],"metadata":{}},{"cell_type":"code","source":["#Creating a dataframe using sqlContext\ntracksDF = sqlContext.createDataFrame(tracks_parsed)\n#Registering the dataframe as a temporary table\ntracksDF.registerTempTable(\"TracksTable\")\n#Execute the sql query using the sqlContext object\nsql_results = sqlContext.sql(\"Select customer_id,count(track_id) as mobile_count from TracksTable where mobile=0 group by customer_id order by mobile_count DESC\")\n#Printing the customer Id, name and the number of mobile tracks listened by a customer\nfor result in sql_results.take(10):\n print \"Customer Id:\", result.customer_id, \"named:\", cust_names.value[result.customer_id], \"has listened to\", result.mobile_count, \"mobile tracks\""],"metadata":{},"outputs":[],"execution_count":18},{"cell_type":"markdown","source":["### 3. Percentage of tracks listened by the users during different times of the day\n\nFinding the percentage of tracks during different times of day, involves in finding the total number of tracks listened by a user and also the number of tracks listened by the user during different times of the day. Using the `tracks_parsed` named tuple's date_time attribte define a method to determine when in a day a particular track was played. \n\n1. Apply `map` transformation to the `tracks_parsed` RDD to form a Pair RDD which contains Customer Id and Date Time.\n2. Apply `reduceByKey` transformation to form a list of all the Date Time's the user has listened to a track.\n3. Implement a custom function, which takes input of all the Date Time's the user has listened to tracks and returns the percent of tracks listened at each time of the day.\n4. Using the `take(10)` transformation and use of broadcast variables print the customer name and the percentage of songs listened during each time of the day."],"metadata":{}},{"cell_type":"code","source":["from __future__ import division\n\ndef find_percent(numerator,denominator):\n return (round((numerator/denominator)*100, 2))\n\ndef compute_stats(user_tracks):\n morning_tracks = afternoon_tracks = evening_tracks = night_tracks = total_tracks = 0\n for track in user_tracks: \n total_tracks = total_tracks + 1\n # The date time is in the format of 2014-12-01 09:54:09\n date, time = track.split(\" \")\n hour_of_day = int(time.split(\":\")[0])\n if hour_of_day < 5:\n night_tracks = night_tracks + 1\n elif hour_of_day < 12:\n morning_tracks = morning_tracks + 1\n elif hour_of_day < 17:\n afternoon_tracks = afternoon_tracks + 1\n elif hour_of_day < 22:\n evening_tracks = evening_tracks + 1\n else:\n night_tracks = night_tracks + 1\n return (find_percent(morning_tracks,total_tracks), find_percent(afternoon_tracks,total_tracks), find_percent(evening_tracks,total_tracks), find_percent(night_tracks,total_tracks))\n \nuser_stats = tracks_parsed.map(lambda track: (track.customer_id, [track.date_time])).reduceByKey(lambda x,y:x+y).mapValues(lambda x: compute_stats(x))\n\nfor stat in user_stats.take(10):\n print \"Customer:\", cust_names.value[stat[0]], \"has watched\", stat[1][0], \"% tracks in the morning,\", stat[1][1], \"% in the afternoon,\", stat[1][2], \"% in the evening, and\", stat[1][3], \"% in the night\""],"metadata":{},"outputs":[],"execution_count":20},{"cell_type":"markdown","source":["### 4. Most popular track and its name\n\nMost popular track refers to the track which has been played the most number of times by all the customers.\n\n1. Map through the `tracks_parsed` RDD and form a Pair RDD of `(track_id, 1)`.\n2. Apply the reduceByKey transformation to find the total number of times each track is played.\n3. Apply the reduce action, which compares the number of times a track is played and finds the track which is played the most\n4. Create a broadcast variable containing the names of the track corresponding to its track ID"],"metadata":{}},{"cell_type":"markdown","source":["#### Creating the Track names broadcast variable\n\nDownload the data from the MapR github repo from <a href=\"https://raw.githubusercontent.com/mapr/mapr-demos/master/spark_music_demo/music.csv\">here</a>. Load it into the tmp folder of your local file system. Define a method which creates broadcast variable for Track names"],"metadata":{}},{"cell_type":"code","source":["%sh \nwget -P /tmp \"https://raw.githubusercontent.com/mapr/mapr-demos/master/spark_music_demo/music.csv\""],"metadata":{},"outputs":[],"execution_count":23},{"cell_type":"code","source":["# A method which creates a dictionary which maps Track Id to Track Name \ndef create_music_dict():\n music_dict = {}\n with open('/tmp/music.csv') as f:\n for line in f:\n fields = line.split(\",\")\n if (fields[0] !=\"TrackId\"):\n track_id = int(fields[0])\n title = str(fields[1])\n artist = str(fields[2])\n length = int(fields[3])\n music_dict[track_id] = title\n return music_dict\n\n#Creating the broadcast variable tracks_info\ntracks_info = sc.broadcast(create_music_dict())"],"metadata":{},"outputs":[],"execution_count":24},{"cell_type":"code","source":["track_counts = tracks_parsed.map(lambda track: (track.track_id,1)).reduceByKey(lambda a,b: a+b)\nmost_popular_track = track_counts.reduce(lambda track1, track2: track1 if track1[1]>track2[1] else track1)\nprint \"The most popular track is\", tracks_info.value[most_popular_track[0]], \"and is played\", most_popular_track[1] , \"times\""],"metadata":{},"outputs":[],"execution_count":25}],"metadata":{"name":"musicAnalysis","notebookId":4202532474383986},"nbformat":4,"nbformat_minor":0}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment