Skip to content

Instantly share code, notes, and snippets.

@nerdroychan
Created November 27, 2016 15:34
Show Gist options
  • Save nerdroychan/fcde3e7b3496c6d6cda6b6b080e455ac to your computer and use it in GitHub Desktop.
Save nerdroychan/fcde3e7b3496c6d6cda6b6b080e455ac to your computer and use it in GitHub Desktop.
import numpy as np
import matplotlib as plt
import random
import copy
import csv
import difflib
R = []
# Read the training set, init it and transpose
with open('training.csv') as training_file:
reader = csv.reader(training_file)
r = 0
c = 0
for row in reader:
c = 0
for column in row:
try:
R[c].append(float(column))
except:
R.append([])
R[c].append(float(column))
c += 1
r += 1
people_num = len(R)
movie_num = len(R[0])
test_points = []
sample_num, test_num = 0, 0
for i in range(people_num):
for j in range(movie_num):
if R[i][j] == -1.0:
R[i][j] = 0.0
test_num += 1
test_points.append((i, j))
elif R[i][j] != 0.0:
sample_num += 1
R_test = []
with open('testing.csv') as testing_file:
reader = csv.reader(testing_file)
r = 0
c = 0
for row in reader:
c = 0
for column in row:
try:
R_test[c].append(float(column))
except:
R_test.append([])
R_test[c].append(float(column))
c += 1
r += 1
# Now, R is the training set, and I have recorded the testing set
sample_avg = sum([sum(x) for x in R]) / sample_num
A = [[0 for _ in range(people_num+movie_num)] for _ in range(sample_num)]
c = [None for _ in range(sample_num)]
count = 0
for j in range(movie_num):
for i in range(people_num):
if R[i][j] != 0.0:
c[count] = R[i][j] - sample_avg
A[count][i] = 1
A[count][people_num+j] = 1
count += 1
# First, using baseline method
A = np.array(A)
c = np.array(c)
sol = np.dot(np.linalg.pinv(A), np.dot(np.linalg.pinv(A.T), np.dot(A.T, c)))
b_u = sol[:people_num]
b_i = sol[people_num:]
R_hat = copy.deepcopy(R)
for j in range(movie_num):
for i in range(people_num):
if R_hat[i][j] != 0.0 or (i, j) in test_points:
tmp = sample_avg + b_u[i] + b_i[j]
R_hat[i][j] = float(max(min(5, tmp), 1))
# Training RMSE
RMSE_training = 0
count = 0
for j in range(movie_num):
for i in range(people_num):
if R[i][j] != 0.0:
RMSE_training += (R_hat[i][j] - R[i][j]) ** 2
RMSE_training = np.sqrt(RMSE_training/sample_num)
print('Baseline method training RMSE: ', RMSE_training)
# Testing RMSE
RMSE_testing = 0
for i in test_points:
RMSE_testing += (R_hat[i[0]][i[1]] - R_test[i[0]][i[1]]) ** 2
RMSE_testing = np.sqrt(RMSE_testing/len(test_points))
print('Baseline method testing RMSE: ', RMSE_testing)
# Second, using neighbourhood method
for i in test_points:
R_hat[i[0]][i[1]] = 0.0
R_wave = np.array(R) - np.array(R_hat)
# Movie to movie interaction
compress_ratio = 4
D = [[0 for _ in range(movie_num)] for _ in range(movie_num)]
for i in range(movie_num):
for j in range(movie_num):
common = 0
if i == j:
D[i][j] = 0
else:
s = 0
mi = 0
mj = 0
for k in range(people_num):
if R[k][i] != 0.0 and R[k][j] != 0.0:
common += 1
s += R_wave[k][i] * R_wave[k][j]
mi += (R_wave[k][i]+1E-3)**2
mj += (R_wave[k][j]+1E-3)**2
if mi*mj == 0:
deg = 1E-5
else:
deg = s / np.sqrt(mi*mj) * (common/(common+compress_ratio))
D[i][j] = deg
D[j][i] = deg
# User to User method
compress_ratio = 0
U = [[0 for _ in range(people_num)] for _ in range(people_num)]
for i in range(people_num):
for j in range(people_num):
common = 0
if i == j:
U[i][j] = 0
else:
s = 0
mi = 0
mj = 0
for k in range(movie_num):
if R[i][k] != 0.0 and R[j][k] != 0.0:
common += 1
s += R_wave[i][k] * R_wave[j][k]
mi += (R_wave[i][k]+1E-3)**2
mj += (R_wave[j][k]+1E-3)**2
if mi*mj == 0:
deg = 1E-5
else:
deg = s / np.sqrt(mi*mj) * (common/(common+compress_ratio))
U[i][j] = deg
U[j][i] = deg
L = movie_num//11
Lu = people_num
for j in range(movie_num):
for i in range(people_num):
tmp = sample_avg + b_u[i] + b_i[j]
s = 0
m = 0
srt_d = sorted(D[j][:j]+D[j][j+1:], key=lambda x: abs(x), reverse=True)[:L]
# Add D
for k in range(movie_num):
if j != k and D[j][k] in srt_d:
s += D[j][k] * R_wave[i][k]
m += abs(D[j][k])
tmp += s / m
# Add U
s = 0
m = 0
srt_u = sorted(U[i][:i]+U[i][i+1:], key=lambda x: abs(x), reverse=True)[:Lu]
for k in range(people_num):
if i != k and U[i][k] in srt_u:
s += U[k][i] * R_wave[k][j]
m += abs(U[i][k])
tmp += s / m
if R[i][j] != 0 or (i, j) in test_points:
R_hat[i][j] = max(min(5, tmp), 1)
# Training RMSE
RMSE_training = 0
count = 0
for j in range(movie_num):
for i in range(people_num):
if R[i][j] != 0.0:
RMSE_training += (R_hat[i][j] - R[i][j]) ** 2
count += 1
RMSE_training = np.sqrt(RMSE_training/count)
print('Neighbourhood method training RMSE: ', RMSE_training)
# Testing RMSE
RMSE_testing = 0
for i in test_points:
RMSE_testing += (R_hat[i[0]][i[1]] - R_test[i[0]][i[1]]) ** 2
RMSE_testing = np.sqrt(RMSE_testing/len(test_points))
print('Neighbourhood method testing RMSE: ', RMSE_testing)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment