Skip to content

Instantly share code, notes, and snippets.

@aziflaj
Created April 8, 2015 13:09
Show Gist options
  • Save aziflaj/edd74859c87a5555305c to your computer and use it in GitHub Desktop.
Save aziflaj/edd74859c87a5555305c to your computer and use it in GitHub Desktop.
Guide to Data Mining
import codecs
from math import sqrt
users2 = {"Amy": {"Taylor Swift": 4, "PSY": 3, "Whitney Houston": 4},
"Ben": {"Taylor Swift": 5, "PSY": 2},
"Clara": {"PSY": 3.5, "Whitney Houston": 4},
"Daisy": {"Taylor Swift": 5, "Whitney Houston": 3}}
users = {"Angelica": {"Blues Traveler": 3.5, "Broken Bells": 2.0,
"Norah Jones": 4.5, "Phoenix": 5.0,
"Slightly Stoopid": 1.5, "The Strokes": 2.5,
"Vampire Weekend": 2.0},
"Bill":{"Blues Traveler": 2.0, "Broken Bells": 3.5,
"Deadmau5": 4.0, "Phoenix": 2.0,
"Slightly Stoopid": 3.5, "Vampire Weekend": 3.0},
"Chan": {"Blues Traveler": 5.0, "Broken Bells": 1.0,
"Deadmau5": 1.0, "Norah Jones": 3.0,
"Phoenix": 5, "Slightly Stoopid": 1.0},
"Dan": {"Blues Traveler": 3.0, "Broken Bells": 4.0,
"Deadmau5": 4.5, "Phoenix": 3.0,
"Slightly Stoopid": 4.5, "The Strokes": 4.0,
"Vampire Weekend": 2.0},
"Hailey": {"Broken Bells": 4.0, "Deadmau5": 1.0,
"Norah Jones": 4.0, "The Strokes": 4.0,
"Vampire Weekend": 1.0},
"Jordyn": {"Broken Bells": 4.5, "Deadmau5": 4.0,
"Norah Jones": 5.0, "Phoenix": 5.0,
"Slightly Stoopid": 4.5, "The Strokes": 4.0,
"Vampire Weekend": 4.0},
"Sam": {"Blues Traveler": 5.0, "Broken Bells": 2.0,
"Norah Jones": 3.0, "Phoenix": 5.0,
"Slightly Stoopid": 4.0, "The Strokes": 5.0},
"Veronica": {"Blues Traveler": 3.0, "Norah Jones": 5.0,
"Phoenix": 4.0, "Slightly Stoopid": 2.5,
"The Strokes": 3.0}
}
class recommender:
def __init__(self, data, k=1, metric='pearson', n=5):
""" initialize recommender
currently, if data is dictionary the recommender is initialized
to it.
For all other data types of data, no initialization occurs
k is the k value for k nearest neighbor
metric is which distance formula to use
n is the maximum number of recommendations to make"""
self.k = k
self.n = n
self.username2id = {}
self.userid2name = {}
self.productid2name = {}
#
# The following two variables are used for Slope One
#
self.frequencies = {}
self.deviations = {}
# for some reason I want to save the name of the metric
self.metric = metric
if self.metric == 'pearson':
self.fn = self.pearson
#
# if data is dictionary set recommender data to it
#
if type(data).__name__ == 'dict':
self.data = data
def convertProductID2name(self, id):
"""Given product id number return product name"""
if id in self.productid2name:
return self.productid2name[id]
else:
return id
def userRatings(self, id, n):
"""Return n top ratings for user with id"""
print ("Ratings for " + self.userid2name[id])
ratings = self.data[id]
print(len(ratings))
ratings = list(ratings.items())[:n]
ratings = [(self.convertProductID2name(k), v)
for (k, v) in ratings]
# finally sort and return
ratings.sort(key=lambda artistTuple: artistTuple[1],
reverse = True)
for rating in ratings:
print("%s\t%i" % (rating[0], rating[1]))
def showUserTopItems(self, user, n):
""" show top n items for user"""
items = list(self.data[user].items())
items.sort(key=lambda itemTuple: itemTuple[1], reverse=True)
for i in range(n):
print("%s\t%i" % (self.convertProductID2name(items[i][0]),
items[i][1]))
def loadMovieLens(self, path=''):
self.data = {}
#
# first load movie ratings
#
i = 0
#
# First load book ratings into self.data
#
#f = codecs.open(path + "u.data", 'r', 'utf8')
f = codecs.open(path + "u.data", 'r', 'ascii')
# f = open(path + "u.data")
for line in f:
i += 1
#separate line into fields
fields = line.split('\t')
user = fields[0]
movie = fields[1]
rating = int(fields[2].strip().strip('"'))
if user in self.data:
currentRatings = self.data[user]
else:
currentRatings = {}
currentRatings[movie] = rating
self.data[user] = currentRatings
f.close()
#
# Now load movie into self.productid2name
# the file u.item contains movie id, title, release date among
# other fields
#
#f = codecs.open(path + "u.item", 'r', 'utf8')
f = codecs.open(path + "u.item", 'r', 'iso8859-1', 'ignore')
#f = open(path + "u.item")
for line in f:
i += 1
#separate line into fields
fields = line.split('|')
mid = fields[0].strip()
title = fields[1].strip()
self.productid2name[mid] = title
f.close()
#
# Now load user info into both self.userid2name
# and self.username2id
#
#f = codecs.open(path + "u.user", 'r', 'utf8')
f = open(path + "u.user")
for line in f:
i += 1
fields = line.split('|')
userid = fields[0].strip('"')
self.userid2name[userid] = line
self.username2id[line] = userid
f.close()
print(i)
def loadBookDB(self, path=''):
"""loads the BX book dataset. Path is where the BX files are
located"""
self.data = {}
i = 0
#
# First load book ratings into self.data
#
f = codecs.open(path + "u.data", 'r', 'utf8')
for line in f:
i += 1
# separate line into fields
fields = line.split(';')
user = fields[0].strip('"')
book = fields[1].strip('"')
rating = int(fields[2].strip().strip('"'))
if rating > 5:
print("EXCEEDING ", rating)
if user in self.data:
currentRatings = self.data[user]
else:
currentRatings = {}
currentRatings[book] = rating
self.data[user] = currentRatings
f.close()
#
# Now load books into self.productid2name
# Books contains isbn, title, and author among other fields
#
f = codecs.open(path + "BX-Books.csv", 'r', 'utf8')
for line in f:
i += 1
# separate line into fields
fields = line.split(';')
isbn = fields[0].strip('"')
title = fields[1].strip('"')
author = fields[2].strip().strip('"')
title = title + ' by ' + author
self.productid2name[isbn] = title
f.close()
#
# Now load user info into both self.userid2name and
# self.username2id
#
f = codecs.open(path + "BX-Users.csv", 'r', 'utf8')
for line in f:
i += 1
# separate line into fields
fields = line.split(';')
userid = fields[0].strip('"')
location = fields[1].strip('"')
if len(fields) > 3:
age = fields[2].strip().strip('"')
else:
age = 'NULL'
if age != 'NULL':
value = location + ' (age: ' + age + ')'
else:
value = location
self.userid2name[userid] = value
self.username2id[location] = userid
f.close()
print(i)
def computeDeviations(self):
# for each person in the data:
# get their ratings
for ratings in self.data.values():
# for each item & rating in that set of ratings:
for (item, rating) in ratings.items():
self.frequencies.setdefault(item, {})
self.deviations.setdefault(item, {})
# for each item2 & rating2 in that set of ratings:
for (item2, rating2) in ratings.items():
if item != item2:
# add the difference between the ratings to our
# computation
self.frequencies[item].setdefault(item2, 0)
self.deviations[item].setdefault(item2, 0.0)
self.frequencies[item][item2] += 1
self.deviations[item][item2] += rating - rating2
for (item, ratings) in self.deviations.items():
for item2 in ratings:
ratings[item2] /= self.frequencies[item][item2]
def slopeOneRecommendations(self, userRatings):
recommendations = {}
frequencies = {}
# for every item and rating in the user's recommendations
for (userItem, userRating) in userRatings.items():
# for every item in our dataset that the user didn't rate
for (diffItem, diffRatings) in self.deviations.items():
if diffItem not in userRatings and \
userItem in self.deviations[diffItem]:
freq = self.frequencies[diffItem][userItem]
recommendations.setdefault(diffItem, 0.0)
frequencies.setdefault(diffItem, 0)
# add to the running sum representing the numerator
# of the formula
recommendations[diffItem] += (diffRatings[userItem] +
userRating) * freq
# keep a running sum of the frequency of diffitem
frequencies[diffItem] += freq
recommendations = [(self.convertProductID2name(k),
v / frequencies[k])
for (k, v) in recommendations.items()]
# finally sort and return
recommendations.sort(key=lambda artistTuple: artistTuple[1],
reverse = True)
# I am only going to return the first 50 recommendations
return recommendations[:50]
def pearson(self, rating1, rating2):
sum_xy = 0
sum_x = 0
sum_y = 0
sum_x2 = 0
sum_y2 = 0
n = 0
for key in rating1:
if key in rating2:
n += 1
x = rating1[key]
y = rating2[key]
sum_xy += x * y
sum_x += x
sum_y += y
sum_x2 += pow(x, 2)
sum_y2 += pow(y, 2)
if n == 0:
return 0
# now compute denominator
denominator = sqrt(sum_x2 - pow(sum_x, 2) / n) * \
sqrt(sum_y2 - pow(sum_y, 2) / n)
if denominator == 0:
return 0
else:
return (sum_xy - (sum_x * sum_y) / n) / denominator
def computeNearestNeighbor(self, username):
"""creates a sorted list of users based on their distance
to username"""
distances = []
for instance in self.data:
if instance != username:
distance = self.fn(self.data[username],
self.data[instance])
distances.append((instance, distance))
# sort based on distance -- closest first
distances.sort(key=lambda artistTuple: artistTuple[1],
reverse=True)
return distances
def recommend(self, user):
"""Give list of recommendations"""
recommendations = {}
# first get list of users ordered by nearness
nearest = self.computeNearestNeighbor(user)
#
# now get the ratings for the user
#
userRatings = self.data[user]
#
# determine the total distance
totalDistance = 0.0
for i in range(self.k):
totalDistance += nearest[i][1]
# now iterate through the k nearest neighbors
# accumulating their ratings
for i in range(self.k):
# compute slice of pie
weight = nearest[i][1] / totalDistance
# get the name of the person
name = nearest[i][0]
# get the ratings for this person
neighborRatings = self.data[name]
# get the name of the person
# now find bands neighbor rated that user didn't
for artist in neighborRatings:
if not artist in userRatings:
if artist not in recommendations:
recommendations[artist] = neighborRatings[artist] * \
weight
else:
recommendations[artist] = recommendations[artist] + \
neighborRatings[artist] * \
weight
# now make list from dictionary and only get the first n items
recommendations = list(recommendations.items())[:self.n]
recommendations = [(self.convertProductID2name(k), v)
for (k, v) in recommendations]
# finally sort and return
recommendations.sort(key=lambda artistTuple: artistTuple[1],
reverse = True)
return recommendations
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment