Created
April 8, 2015 13:09
-
-
Save aziflaj/edd74859c87a5555305c to your computer and use it in GitHub Desktop.
Guide to Data Mining
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import codecs | |
from math import sqrt | |
users2 = {"Amy": {"Taylor Swift": 4, "PSY": 3, "Whitney Houston": 4}, | |
"Ben": {"Taylor Swift": 5, "PSY": 2}, | |
"Clara": {"PSY": 3.5, "Whitney Houston": 4}, | |
"Daisy": {"Taylor Swift": 5, "Whitney Houston": 3}} | |
users = {"Angelica": {"Blues Traveler": 3.5, "Broken Bells": 2.0, | |
"Norah Jones": 4.5, "Phoenix": 5.0, | |
"Slightly Stoopid": 1.5, "The Strokes": 2.5, | |
"Vampire Weekend": 2.0}, | |
"Bill":{"Blues Traveler": 2.0, "Broken Bells": 3.5, | |
"Deadmau5": 4.0, "Phoenix": 2.0, | |
"Slightly Stoopid": 3.5, "Vampire Weekend": 3.0}, | |
"Chan": {"Blues Traveler": 5.0, "Broken Bells": 1.0, | |
"Deadmau5": 1.0, "Norah Jones": 3.0, | |
"Phoenix": 5, "Slightly Stoopid": 1.0}, | |
"Dan": {"Blues Traveler": 3.0, "Broken Bells": 4.0, | |
"Deadmau5": 4.5, "Phoenix": 3.0, | |
"Slightly Stoopid": 4.5, "The Strokes": 4.0, | |
"Vampire Weekend": 2.0}, | |
"Hailey": {"Broken Bells": 4.0, "Deadmau5": 1.0, | |
"Norah Jones": 4.0, "The Strokes": 4.0, | |
"Vampire Weekend": 1.0}, | |
"Jordyn": {"Broken Bells": 4.5, "Deadmau5": 4.0, | |
"Norah Jones": 5.0, "Phoenix": 5.0, | |
"Slightly Stoopid": 4.5, "The Strokes": 4.0, | |
"Vampire Weekend": 4.0}, | |
"Sam": {"Blues Traveler": 5.0, "Broken Bells": 2.0, | |
"Norah Jones": 3.0, "Phoenix": 5.0, | |
"Slightly Stoopid": 4.0, "The Strokes": 5.0}, | |
"Veronica": {"Blues Traveler": 3.0, "Norah Jones": 5.0, | |
"Phoenix": 4.0, "Slightly Stoopid": 2.5, | |
"The Strokes": 3.0} | |
} | |
class recommender: | |
def __init__(self, data, k=1, metric='pearson', n=5): | |
""" initialize recommender | |
currently, if data is dictionary the recommender is initialized | |
to it. | |
For all other data types of data, no initialization occurs | |
k is the k value for k nearest neighbor | |
metric is which distance formula to use | |
n is the maximum number of recommendations to make""" | |
self.k = k | |
self.n = n | |
self.username2id = {} | |
self.userid2name = {} | |
self.productid2name = {} | |
# | |
# The following two variables are used for Slope One | |
# | |
self.frequencies = {} | |
self.deviations = {} | |
# for some reason I want to save the name of the metric | |
self.metric = metric | |
if self.metric == 'pearson': | |
self.fn = self.pearson | |
# | |
# if data is dictionary set recommender data to it | |
# | |
if type(data).__name__ == 'dict': | |
self.data = data | |
def convertProductID2name(self, id): | |
"""Given product id number return product name""" | |
if id in self.productid2name: | |
return self.productid2name[id] | |
else: | |
return id | |
def userRatings(self, id, n): | |
"""Return n top ratings for user with id""" | |
print ("Ratings for " + self.userid2name[id]) | |
ratings = self.data[id] | |
print(len(ratings)) | |
ratings = list(ratings.items())[:n] | |
ratings = [(self.convertProductID2name(k), v) | |
for (k, v) in ratings] | |
# finally sort and return | |
ratings.sort(key=lambda artistTuple: artistTuple[1], | |
reverse = True) | |
for rating in ratings: | |
print("%s\t%i" % (rating[0], rating[1])) | |
def showUserTopItems(self, user, n): | |
""" show top n items for user""" | |
items = list(self.data[user].items()) | |
items.sort(key=lambda itemTuple: itemTuple[1], reverse=True) | |
for i in range(n): | |
print("%s\t%i" % (self.convertProductID2name(items[i][0]), | |
items[i][1])) | |
def loadMovieLens(self, path=''): | |
self.data = {} | |
# | |
# first load movie ratings | |
# | |
i = 0 | |
# | |
# First load book ratings into self.data | |
# | |
#f = codecs.open(path + "u.data", 'r', 'utf8') | |
f = codecs.open(path + "u.data", 'r', 'ascii') | |
# f = open(path + "u.data") | |
for line in f: | |
i += 1 | |
#separate line into fields | |
fields = line.split('\t') | |
user = fields[0] | |
movie = fields[1] | |
rating = int(fields[2].strip().strip('"')) | |
if user in self.data: | |
currentRatings = self.data[user] | |
else: | |
currentRatings = {} | |
currentRatings[movie] = rating | |
self.data[user] = currentRatings | |
f.close() | |
# | |
# Now load movie into self.productid2name | |
# the file u.item contains movie id, title, release date among | |
# other fields | |
# | |
#f = codecs.open(path + "u.item", 'r', 'utf8') | |
f = codecs.open(path + "u.item", 'r', 'iso8859-1', 'ignore') | |
#f = open(path + "u.item") | |
for line in f: | |
i += 1 | |
#separate line into fields | |
fields = line.split('|') | |
mid = fields[0].strip() | |
title = fields[1].strip() | |
self.productid2name[mid] = title | |
f.close() | |
# | |
# Now load user info into both self.userid2name | |
# and self.username2id | |
# | |
#f = codecs.open(path + "u.user", 'r', 'utf8') | |
f = open(path + "u.user") | |
for line in f: | |
i += 1 | |
fields = line.split('|') | |
userid = fields[0].strip('"') | |
self.userid2name[userid] = line | |
self.username2id[line] = userid | |
f.close() | |
print(i) | |
def loadBookDB(self, path=''): | |
"""loads the BX book dataset. Path is where the BX files are | |
located""" | |
self.data = {} | |
i = 0 | |
# | |
# First load book ratings into self.data | |
# | |
f = codecs.open(path + "u.data", 'r', 'utf8') | |
for line in f: | |
i += 1 | |
# separate line into fields | |
fields = line.split(';') | |
user = fields[0].strip('"') | |
book = fields[1].strip('"') | |
rating = int(fields[2].strip().strip('"')) | |
if rating > 5: | |
print("EXCEEDING ", rating) | |
if user in self.data: | |
currentRatings = self.data[user] | |
else: | |
currentRatings = {} | |
currentRatings[book] = rating | |
self.data[user] = currentRatings | |
f.close() | |
# | |
# Now load books into self.productid2name | |
# Books contains isbn, title, and author among other fields | |
# | |
f = codecs.open(path + "BX-Books.csv", 'r', 'utf8') | |
for line in f: | |
i += 1 | |
# separate line into fields | |
fields = line.split(';') | |
isbn = fields[0].strip('"') | |
title = fields[1].strip('"') | |
author = fields[2].strip().strip('"') | |
title = title + ' by ' + author | |
self.productid2name[isbn] = title | |
f.close() | |
# | |
# Now load user info into both self.userid2name and | |
# self.username2id | |
# | |
f = codecs.open(path + "BX-Users.csv", 'r', 'utf8') | |
for line in f: | |
i += 1 | |
# separate line into fields | |
fields = line.split(';') | |
userid = fields[0].strip('"') | |
location = fields[1].strip('"') | |
if len(fields) > 3: | |
age = fields[2].strip().strip('"') | |
else: | |
age = 'NULL' | |
if age != 'NULL': | |
value = location + ' (age: ' + age + ')' | |
else: | |
value = location | |
self.userid2name[userid] = value | |
self.username2id[location] = userid | |
f.close() | |
print(i) | |
def computeDeviations(self): | |
# for each person in the data: | |
# get their ratings | |
for ratings in self.data.values(): | |
# for each item & rating in that set of ratings: | |
for (item, rating) in ratings.items(): | |
self.frequencies.setdefault(item, {}) | |
self.deviations.setdefault(item, {}) | |
# for each item2 & rating2 in that set of ratings: | |
for (item2, rating2) in ratings.items(): | |
if item != item2: | |
# add the difference between the ratings to our | |
# computation | |
self.frequencies[item].setdefault(item2, 0) | |
self.deviations[item].setdefault(item2, 0.0) | |
self.frequencies[item][item2] += 1 | |
self.deviations[item][item2] += rating - rating2 | |
for (item, ratings) in self.deviations.items(): | |
for item2 in ratings: | |
ratings[item2] /= self.frequencies[item][item2] | |
def slopeOneRecommendations(self, userRatings): | |
recommendations = {} | |
frequencies = {} | |
# for every item and rating in the user's recommendations | |
for (userItem, userRating) in userRatings.items(): | |
# for every item in our dataset that the user didn't rate | |
for (diffItem, diffRatings) in self.deviations.items(): | |
if diffItem not in userRatings and \ | |
userItem in self.deviations[diffItem]: | |
freq = self.frequencies[diffItem][userItem] | |
recommendations.setdefault(diffItem, 0.0) | |
frequencies.setdefault(diffItem, 0) | |
# add to the running sum representing the numerator | |
# of the formula | |
recommendations[diffItem] += (diffRatings[userItem] + | |
userRating) * freq | |
# keep a running sum of the frequency of diffitem | |
frequencies[diffItem] += freq | |
recommendations = [(self.convertProductID2name(k), | |
v / frequencies[k]) | |
for (k, v) in recommendations.items()] | |
# finally sort and return | |
recommendations.sort(key=lambda artistTuple: artistTuple[1], | |
reverse = True) | |
# I am only going to return the first 50 recommendations | |
return recommendations[:50] | |
def pearson(self, rating1, rating2): | |
sum_xy = 0 | |
sum_x = 0 | |
sum_y = 0 | |
sum_x2 = 0 | |
sum_y2 = 0 | |
n = 0 | |
for key in rating1: | |
if key in rating2: | |
n += 1 | |
x = rating1[key] | |
y = rating2[key] | |
sum_xy += x * y | |
sum_x += x | |
sum_y += y | |
sum_x2 += pow(x, 2) | |
sum_y2 += pow(y, 2) | |
if n == 0: | |
return 0 | |
# now compute denominator | |
denominator = sqrt(sum_x2 - pow(sum_x, 2) / n) * \ | |
sqrt(sum_y2 - pow(sum_y, 2) / n) | |
if denominator == 0: | |
return 0 | |
else: | |
return (sum_xy - (sum_x * sum_y) / n) / denominator | |
def computeNearestNeighbor(self, username): | |
"""creates a sorted list of users based on their distance | |
to username""" | |
distances = [] | |
for instance in self.data: | |
if instance != username: | |
distance = self.fn(self.data[username], | |
self.data[instance]) | |
distances.append((instance, distance)) | |
# sort based on distance -- closest first | |
distances.sort(key=lambda artistTuple: artistTuple[1], | |
reverse=True) | |
return distances | |
def recommend(self, user): | |
"""Give list of recommendations""" | |
recommendations = {} | |
# first get list of users ordered by nearness | |
nearest = self.computeNearestNeighbor(user) | |
# | |
# now get the ratings for the user | |
# | |
userRatings = self.data[user] | |
# | |
# determine the total distance | |
totalDistance = 0.0 | |
for i in range(self.k): | |
totalDistance += nearest[i][1] | |
# now iterate through the k nearest neighbors | |
# accumulating their ratings | |
for i in range(self.k): | |
# compute slice of pie | |
weight = nearest[i][1] / totalDistance | |
# get the name of the person | |
name = nearest[i][0] | |
# get the ratings for this person | |
neighborRatings = self.data[name] | |
# get the name of the person | |
# now find bands neighbor rated that user didn't | |
for artist in neighborRatings: | |
if not artist in userRatings: | |
if artist not in recommendations: | |
recommendations[artist] = neighborRatings[artist] * \ | |
weight | |
else: | |
recommendations[artist] = recommendations[artist] + \ | |
neighborRatings[artist] * \ | |
weight | |
# now make list from dictionary and only get the first n items | |
recommendations = list(recommendations.items())[:self.n] | |
recommendations = [(self.convertProductID2name(k), v) | |
for (k, v) in recommendations] | |
# finally sort and return | |
recommendations.sort(key=lambda artistTuple: artistTuple[1], | |
reverse = True) | |
return recommendations | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment