Skip to content

Instantly share code, notes, and snippets.

@amirziai
Created September 23, 2015 08:15
Show Gist options
  • Save amirziai/4fef301b0be3be18ff14 to your computer and use it in GitHub Desktop.
Save amirziai/4fef301b0be3be18ff14 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<b>\n",
"Amir Ziai<br>\n",
"amir@ischool.berkeley.edu<br>\n",
"UC Berkeley MIDS<br>\n",
"Machine learning at scale course<br>\n",
"Week 3 assignment<br>\n",
"September 22, 2015\n",
"</b>"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## HW3.0"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<b>merge sort:</b> \n",
"<br>\n",
"a parallel algorithm that divides the unsorted list into sublists of 1 elements and then merges sublists back together to produce a sorted list. \n",
"<br><br>\n",
"<b>usage in hadoop:</b>\n",
"<br>\n",
"Merge sort in Hadoop can be used to merge the sorted list of keys.\n",
"<br><br>\n",
"<b>combiner function in the context of hadoop:</b>\n",
"<br>\n",
"Combiner is an optional function that can improve the communication overhead in some cases by doing a reduce-type operation. Before the mapper writes out the key, value pair, the combiner collects the outputs by key in lists and flushes periodically by applying the reduce function. The output should be compatible to what the mapper would've outputted but the number of pairs that are shipped to the reducer are reduced.\n",
"<br><br>\n",
"<b>example of why it's useful</b>\n",
"<br>\n",
"In the classic word count example many (word, 1) pairs will be emitted by the mapper. The combiner can take some of the intermediary words and reduce them to a smaller number of pairs with the same key that are already aggregated before sending them to the reducer. For example if the same mapper has generated 1,000 (the, 1) pairs the combiner will package them all to (the, 1000) vs sending 1,000 pairs.\n",
"<br><br>\n",
"<b>Hadoop shuffle:</b>\n",
"<br>\n",
"Shuffle sits between map and reduce. It prepares the data, sorts it by key and sends it to reducer(s). A paritioner will decide which (key, value) pairs are going to which partition (by hashing the keys by default). \n",
"<br><br>\n",
"<b>Apriori algorithm:</b>\n",
"<br>\n",
"An algorithm for mining the frequent itemsets in a set of transactions. It is an iterative algorithm, first it finds items that satisfy a minimum frequency criteria and then moves on to finding pairs, triplets and so on. By leveraging the apriori property it can drastically reduce the covered space.\n",
"<br><br>\n",
"<b>Example in my domain:</b>\n",
"<br>\n",
"This can be applied to portfolios of stocks. Rules like TSLA=>AAPL can be mined as a result of applying this algorithm to a database of portfolios of stocks. The associative rules can be used for recommending investors to explore stocks that are frequently held together.\n",
"<br><br>\n",
"<b>Confidence and lift:</b>\n",
"<br>\n",
"For an associative rule A=>B confidence is frequency of A and B happening together over the frequency of A. It can be thought of as the conditional probability of A and B given A.\n",
"<br><br>\n",
"Lift for A => B is the ratio of confidence to support(B) / number of transactions (or posterior and prior). This measure normalizes confidence for support of B so if we have a B with a very high support the A => B rule may not be very interesting. A ratio higher than 1 is desirable."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## HW3.1"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Product Recommendations: The action or practice of selling additional products or services \n",
"to existing customers is called cross-selling. Giving product recommendation is \n",
"one of the examples of cross-selling that are frequently used by online retailers. \n",
"One simple method to give product recommendations is to recommend products that are frequently\n",
"browsed together by the customers.\n",
"\n",
"~~Suppose we want to recommend new products to the customer based on the products they\n",
"have already browsed on the online website. Write a program using the A-priori algorithm\n",
"to find products which are frequently browsed together. Fix the support to s = 100 \n",
"(i.e. product pairs need to occur together at least 100 times to be considered frequent) \n",
"and find itemsets of size 2 and 3.~~\n",
"\n",
"Use the online browsing behavior dataset at: \n",
"\n",
"https://www.dropbox.com/s/zlfyiwa70poqg74/ProductPurchaseData.txt?dl=0\n",
"\n",
"Each line in this dataset represents a browsing session of a customer. \n",
"On each line, each string of 8 characters represents the id of an item browsed during that session. \n",
"The items are separated by spaces.\n",
"\n",
"Do some exploratory data analysis of this dataset. \n",
"Report your findings such as number of unique products; largest basket, etc. using Hadoop Map-Reduce."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import os\n",
"os.chdir('/Users/amir/Downloads/')"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"FRO11987 ELE17451 ELE89019 SNA90258 GRO99222 \r\n",
"GRO99222 GRO12298 FRO12685 ELE91550 SNA11465 ELE26917 ELE52966 FRO90334 SNA30755 ELE17451 FRO84225 SNA80192 \r\n",
"ELE17451 GRO73461 DAI22896 SNA99873 FRO86643 \r\n",
"ELE17451 ELE37798 FRO86643 GRO56989 ELE23393 SNA11465 \r\n",
"ELE17451 SNA69641 FRO86643 FRO78087 SNA11465 GRO39357 ELE28573 ELE11375 DAI54444 \r\n"
]
}
],
"source": [
"!head -5 ProductPurchaseData.txt"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Number of unique products"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Sequential processing"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"No of products : 12592\n",
"Biggest basket : 37\n",
"Smallest basket : 2\n",
"Average items : 12.2447509726\n"
]
}
],
"source": [
"products = set()\n",
"biggest = 0\n",
"smallest = 1000\n",
"cumulative_count = 0\n",
"count = 0\n",
"\n",
"with open('ProductPurchaseData.txt', 'rb') as f:\n",
" for line in f:\n",
" count += 1\n",
" items = line.split()\n",
" items_count = len(items)\n",
" biggest = max(biggest, items_count)\n",
" smallest = min(smallest, items_count)\n",
" cumulative_count += items_count\n",
" \n",
" for item in items:\n",
" products.add(item)\n",
" \n",
"print 'No of products : %s' % len(products)\n",
"print 'Biggest basket : %s' % biggest\n",
"print 'Smallest basket : %s' % smallest\n",
"print 'Average items : %s' % (cumulative_count / float(count))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Mapper"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Overwriting mapper.py\n"
]
}
],
"source": [
"%%writefile mapper.py\n",
"#!/usr/bin/python\n",
"import sys\n",
"import random\n",
"\n",
"for line in sys.stdin:\n",
" rnd = random.getrandbits(128) # generating a hash for each basket\n",
" for item in line.split():\n",
" print '%s\\t%s\\t%032x' % (item, 1, rnd)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Reducer"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Overwriting reducer.py\n"
]
}
],
"source": [
"%%writefile reducer.py\n",
"#!/usr/bin/python\n",
"import sys\n",
"import operator\n",
"\n",
"items = {}\n",
"baskets = {}\n",
"\n",
"for line in sys.stdin:\n",
" item, count, basket = line.split('\\t')\n",
" count = int(count)\n",
" \n",
" if basket in baskets:\n",
" baskets[basket] += 1\n",
" else:\n",
" baskets[basket] = 1\n",
" \n",
" if item in items:\n",
" items[item] += count\n",
" else:\n",
" items[item] = count\n",
" \n",
"baskets_max = max(baskets.iteritems(), key=operator.itemgetter(1))\n",
"baskets_min = min(baskets.iteritems(), key=operator.itemgetter(1))\n",
"print 'No of products : %s' % len(items)\n",
"print 'Biggest basket : %s' % baskets_max[1]\n",
"print 'Smallest basket : %s' % baskets_min[1]\n",
"print 'Average items : %s' % (sum(baskets.values()) / float(len(baskets)))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Run"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"!hadoop jar /usr/local/Cellar/hadoop/2.7.1/libexec/share/hadoop/tools/lib/hadoop-streaming-2.7.1.jar -mapper mapper.py -reducer reducer.py -input /user/amir/hw3_1/ProductPurchaseData.txt -output /user/amir/hw3_1/output/"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"15/09/22 22:12:57 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n",
"No of products : 12592\t\n",
"Biggest basket : 37\t\n",
"Smallest basket : 2\t\n",
"Average items : 12.2447509726\t\n"
]
}
],
"source": [
"!hdfs dfs -cat /user/amir/hw3_1/output/part-00000"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
"\n",
"## HW3.2"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"(Computationally prohibitive but then again Hadoop can handle this)\n",
"\n",
"Note: for this part the writeup will require a specific rule ordering but the program need not sort the output.\n",
"\n",
"List the top 5 rules with corresponding confidence scores in decreasing order of confidence score \n",
"for frequent (100>count) itemsets of size 2. \n",
"A rule is of the form: \n",
"\n",
"(item1) ⇒ item2.\n",
"\n",
"Fix the ordering of the rule lexicographically (left to right), \n",
"and break ties in confidence (between rules, if any exist) \n",
"by taking the first ones in lexicographically increasing order. \n",
"Use Hadoop MapReduce to complete this part of the assignment; \n",
"use a single mapper and single reducer; use a combiner if you think it will help and justify. "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Sequential solution"
]
},
{
"cell_type": "code",
"execution_count": 73,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Took 15.3012888432 seconds\n",
"====================\n",
"Sorted item1 => item2 by confidence (desc)\n",
"(GRO38636 => FRO40251): 0.990654205607\n",
"(GRO85051 => FRO40251): 0.979406919275\n",
"(FRO92469 => FRO40251): 0.975265017668\n",
"(DAI43868 => SNA82528): 0.952702702703\n",
"(DAI23334 => DAI62779): 0.951048951049\n"
]
}
],
"source": [
"from collections import Counter\n",
"from itertools import combinations\n",
"import operator\n",
"import time\n",
"\n",
"min_support = 100\n",
"\n",
"items = Counter()\n",
"items2 = Counter()\n",
"\n",
"def dictionary_filter(dic, min_):\n",
" return dict((k, v) for k, v in dic.items() if v > min_)\n",
"\n",
"with open('ProductPurchaseData.txt') as f:\n",
" for basket in f:\n",
" for item in basket.split():\n",
" items[item] += 1\n",
"\n",
"items_filtered = dictionary_filter(items, min_support)\n",
"\n",
"start = time.time()\n",
"with open('ProductPurchaseData.txt', 'rb') as f:\n",
" for basket in f:\n",
" vals = set(basket.split())\n",
" vals_ = vals.intersection(items) # filter out\n",
" for combo in combinations(vals_, 2):\n",
" items2[combo] += 1\n",
" \n",
"print 'Took %s seconds' % (time.time() - start)\n",
" \n",
"items2_filtered = dictionary_filter(items2, min_support)\n",
"\n",
"rules = dict()\n",
"for k in items2_filtered:\n",
" rules[k] = items2_filtered[k] / float(items[k[0]])\n",
" rules[k[::-1]] = items2_filtered[k] / float(items[k[1]])\n",
" \n",
"print '=' * 20\n",
"print 'Sorted item1 => item2 by confidence (desc)'\n",
"for k in sorted(rules.items(), key=operator.itemgetter(1), reverse=True)[:5]:\n",
" print '(%s => %s): %s' % (k[0][0], k[0][1], k[1])"
]
},
{
"cell_type": "code",
"execution_count": 78,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import json\n",
"# will pass this dictionary to the mapper and reducer\n",
"# it contains the frequent 1-itemsets\n",
"with open('items.txt', 'w') as f:\n",
" json.dump(items_filtered, f)"
]
},
{
"cell_type": "code",
"execution_count": 80,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"15/09/22 23:16:15 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\r\n"
]
}
],
"source": [
"!hdfs dfs -put items.txt /user/amir/hw3_2/"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Mapper"
]
},
{
"cell_type": "code",
"execution_count": 106,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Overwriting mapper.py\n"
]
}
],
"source": [
"%%writefile mapper.py\n",
"#!/usr/bin/python\n",
"import sys\n",
"import json\n",
"from itertools import combinations\n",
"\n",
"with open('items1', 'r') as f:\n",
" items1 = json.load(f)\n",
"\n",
"for line in sys.stdin:\n",
" items = set(line.split())\n",
" items = items.intersection(items1) # filter for items with min_support\n",
" \n",
" for combo in combinations(items, 2): # emit all 2-pair combinations\n",
" print '%s\\t%s\\t%s' % (combo[0], combo[1], 1)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Reducer"
]
},
{
"cell_type": "code",
"execution_count": 116,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Overwriting reducer.py\n"
]
}
],
"source": [
"%%writefile reducer.py\n",
"#!/usr/bin/python\n",
"import sys\n",
"import json\n",
"import operator\n",
"from collections import Counter\n",
"items2 = Counter()\n",
"\n",
"with open('items1', 'r') as f:\n",
" items = json.load(f)\n",
"\n",
"min_support = 100\n",
"\n",
"def dictionary_filter(dic, min_):\n",
" return dict((k, v) for k, v in dic.items() if v > min_)\n",
"\n",
"for line in sys.stdin:\n",
" i1, i2, count = line.split('\\t')\n",
" items2[(i1, i2)] += 1\n",
" \n",
"items2_filtered = dictionary_filter(items2, min_support)\n",
"\n",
"rules = dict()\n",
"for k in items2_filtered:\n",
" rules[k] = items2_filtered[k] / float(items[k[0]])\n",
" rules[k[::-1]] = items2_filtered[k] / float(items[k[1]])\n",
" \n",
"print '=' * 20\n",
"print 'Sorted item1 => item2 by confidence (desc)'\n",
"for k in sorted(rules.items(), key=operator.itemgetter(1), reverse=True)[:5]:\n",
" print '(%s => %s): %s' % (k[0][0], k[0][1], k[1])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"!hadoop jar /usr/local/Cellar/hadoop/2.7.1/libexec/share/hadoop/tools/lib/hadoop-streaming-2.7.1.jar \\\n",
"-files 'items.txt#items1' -mapper mapper.py -reducer reducer.py \\\n",
"-input /user/amir/hw3_1/ProductPurchaseData.txt -output /user/amir/hw3_2/output/"
]
},
{
"cell_type": "code",
"execution_count": 120,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"15/09/22 23:49:34 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n",
"====================\t\n",
"Sorted item1 => item2 by confidence (desc)\t\n",
"(GRO38636 => FRO40251): 0.96261682243\t\n",
"(GRO85051 => FRO40251): 0.957166392092\t\n",
"(DAI43868 => SNA82528): 0.939189189189\t\n",
"(DAI23334 => DAI62779): 0.93006993007\t\n",
"(FRO92469 => FRO40251): 0.883392226148\t\n"
]
}
],
"source": [
"!hdfs dfs -cat /user/amir/hw3_2/output/part-00000"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## ~~HW3.3~~"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"~~Benchmark your results using the pyFIM implementation of the Apriori algorithm\n",
"(Apriori - Association Rule Induction / Frequent Item Set Mining implemented by Christian Borgelt). \n",
"You can download pyFIM from here: http://www.borgelt.net/pyfim.html Comment on the results from both implementations (your Hadoop MapReduce of apriori versus pyFIM) \n",
"in terms of results and execution times.~~"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## HW3.4"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"(Conceptual Exercise)\n",
"\n",
"Suppose that you wished to perform the Apriori algorithm once again,\n",
"though this time now with the goal of listing the top 5 rules with corresponding confidence scores \n",
"in decreasing order of confidence score for itemsets of size 3 using Hadoop MapReduce.\n",
"A rule is now of the form: \n",
"\n",
"(item1, item2) ⇒ item3 \n",
"\n",
"Recall that the Apriori algorithm is iterative for increasing itemset size,\n",
"working off of the frequent itemsets of the previous size to explore \n",
"ONLY the NECESSARY subset of a large combinatorial space. \n",
"Describe how you might design a framework to perform this exercise.\n",
"\n",
"In particular, focus on the following:<br>\n",
"- map-reduce steps required\n",
"- enumeration of item sets and filtering for frequent candidates"
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": true
},
"source": [
"### Answer"
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": true
},
"source": [
"In the map stage 3 itemsets that did not satisfy the minimum support requirements of 2 item sets should be filtered out. In other words if (A, B, C) is a candidate itemset and either (A,B), (B,C) or (C,A) are not in the frequent 2-itemsets we should not emit (A,B,C). This requires access to the 2-itemset associative array at the map stage.\n",
"<br><br>\n",
"In the reduce step we can count the itemsets as before. Combinations that are less than min support should be filtered out. However for association rules and confidence calculations we now should calculate (1, 2) => 3, (2, 3) => 1, and (1, 3) => 2 to sort by highest confidence. This calculation also requires the 2-itemset associative array to lookup denominator values."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.10"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment