Skip to content

Instantly share code, notes, and snippets.

@dimart
Last active November 11, 2015 17:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dimart/8ff9c13f93b62f26bfdb to your computer and use it in GitHub Desktop.
Save dimart/8ff9c13f93b62f26bfdb to your computer and use it in GitHub Desktop.
import time
from collections import defaultdict
from math import log
DEBUG = False
# number of good users to read from file
USER_COUNT = 1000
NUM_SONG_PER_USER = 100
TRAIN_FILE = 'train_triplets.txt'
# user_id -> { song_id: play_count }
train_data = defaultdict()
test_data = defaultdict()
num_of_tested_songs = 0
# list of unique user ids
users = []
ru_sqr = defaultdict()
similarity = defaultdict()
# not normalized metrics
RMSE = 0.0
MAE = 0.0
NDCG = 0.0
def timeit(func):
def wrapper(*arg, **kw):
t0 = time.time()
res = func(*arg, **kw)
t1 = time.time()
if DEBUG:
print str(t1 - t0) + '\t' + func.__name__
return res
return wrapper
def gen_rate(play_cnt):
if 1 <= play_cnt <= 2:
return 1
elif 3 <= play_cnt <= 4:
return 2
elif 5 <= play_cnt <= 6:
return 3
elif 7 <= play_cnt <= 9:
return 4
else:
return 5
@timeit
def load_data():
with open(TRAIN_FILE, "r") as f:
total = 0
user_count = 0
song_count = 0
test_song_count = 0
cur_user = ""
songs_per_user = {}
while user_count < USER_COUNT:
uId, sId, count = f.readline().split()
if cur_user != uId:
total += 1
if len(songs_per_user) >= NUM_SONG_PER_USER:
user_count += 1
users.append(cur_user)
test_songs = {}
for sId in songs_per_user.keys():
song_count += 1
if song_count % 5 == 0:
test_song_count += 1
# remove the song from the training set
test_songs[sId] = songs_per_user.pop(sId)
train_data[cur_user] = songs_per_user
test_data[cur_user] = test_songs
songs_per_user = {}
cur_user = uId
songs_per_user[sId] = gen_rate(int(count))
print "{usr_count} good users (>= {num_songs} songs listened) was selected out of {total} users." \
.format(usr_count=USER_COUNT, num_songs=NUM_SONG_PER_USER, total=total)
print "{percent} percent of data selected for tests: {sc} out of {total}" \
.format(percent=round(test_song_count / float(song_count) * 100),
sc=test_song_count, total=song_count)
def fetch_songs(user_id):
return train_data[user_id].keys()
def R(user_id, song_id):
rating = 0
songs_per_user = train_data[user_id]
if song_id in songs_per_user:
rating = songs_per_user[song_id]
return rating
def test_data_for(user_id):
res = []
songs_per_user = test_data[user_id]
for test_song in test_data[user_id]:
res.append((test_song, songs_per_user[test_song]))
return res
def relevance(rate):
if rate >= 4:
return 1
return 0
def gain(rate):
return 2 ** relevance(rate) - 1
@timeit
def predict(u):
global RMSE, MAE, NDCG, num_of_tested_songs
cosine = []
for v in users:
if v == u:
continue
if (u, v) in similarity:
cosine.append((similarity[(u, v)], v))
elif (v, u) in similarity:
cosine.append((similarity[(v, u)], v))
if len(cosine) == 0:
return
cosine.sort(reverse=True)
dcg = []
dcgi = []
true_rates = {}
for (test_song, true_rate) in test_data_for(u):
sum1 = 0.0
sum2 = 0.0
neighbors = [(cos, v) for (cos, v) in cosine if cos != 0.0 and R(v, test_song) != 0]
if len(neighbors) > 0:
neighbors = neighbors[:5]
for (cos, v) in neighbors:
sum1 += cos * R(v, test_song)
sum2 += abs(cos)
new_rate = sum1 / sum2
num_of_tested_songs += 1
RMSE += ((new_rate - true_rate) ** 2)
MAE += abs(new_rate - true_rate)
dcg.append((test_song, new_rate))
dcgi.append((test_song, true_rate))
true_rates[test_song] = true_rate
# compute user's DCGs
dcg.sort(key=lambda (_, r): r, reverse=True)
dcgi.sort(key=lambda (_, r): gain(r), reverse=True)
dcg_val, dcgi_val = 0.0, 0.0
for i in xrange(len(dcg)):
tmpl = log(i + 1, 2)
dcg_val += relevance(true_rates[dcg[i][0]]) / max(1.0, tmpl)
dcgi_val += relevance(true_rates[dcgi[i][0]]) / max(1.0, tmpl)
if dcgi_val != 0:
NDCG += (dcg_val/dcgi_val)
@timeit
def compute_similarity((i, u)):
for v in users[(i + 1):]:
sum1 = 0.0
for j in fetch_songs(v):
Ruj, Rvj = R(u, j), R(v, j)
if Ruj != 0 and Rvj != 0:
sum1 += Ruj * Rvj
if sum1 == 0:
continue
val = sum1 / ((ru_sqr[u] * ru_sqr[v]) ** 0.5)
similarity[(u, v)] = val
@timeit
def main():
# fetch data (user ids, song ids, play counts) in memory
load_data()
# compute squares
for u in users:
songs = fetch_songs(u)
ru_sqr[u] = reduce(lambda acc, sid: acc + (R(u, sid) ** 2), songs, 0)
# compute cosines
for e in enumerate(users):
compute_similarity(e)
# predict rates and compute errors
for u in users:
predict(u)
rmse_norm = (RMSE / num_of_tested_songs) ** 0.5
mae_norm = (MAE / num_of_tested_songs) ** 0.5
ndcg_norm = NDCG / len(users)
print "\nMAE:\t{mae}\nRMSE:\t{rmse}\nnDSG:\t{ndsg}\n" \
.format(mae=mae_norm, rmse=rmse_norm, ndsg=ndcg_norm)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment