Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
local version of the lastfm recommendations in spark
#start a terminal at the folder where spark is installed
#in the command line run this to fire up a pyspark instance
./bin/pyspark
###########################
### LOADING IN THE DATA ###
###########################
#load in the file and examine
lines = sc.textFile('usersha1-artmbid-artname-plays.tsv')
type(lines)
lines.count()
#17,559,530
#split each line of the file by the tab delimiter and check first line
data = lines.map(lambda l: l.split('\t'))
data.first()
#keep the relevent columns user, artist and plays as rating and check first line
ratings = data.map(lambda d: (d[0], d[2], 1))
ratings.first()
#file is too big to process locally need to reduce.
sample = ratings.sample(withReplacement=False, fraction=0.001, seed=123456789)
sample.count()
#1,756,476
###########################################
### REFORMATTING DATA TO SUIT MLLIB ALS ###
###########################################
#mllib als requires users and items to be integers create lookups
users = sample.map(lambda s: s[0]).distinct()
users.count()
#355,756
#create unique id
users_lkp=users.zipWithUniqueId()
users_lkp.top(5)
items_lkp = sample.map(lambda s: s[1]).distinct().zipWithUniqueId()
items_lkp.count()
#104,677
#now we need to replace the user and artist with the id's in the lookup
#this took some thinking. you can only really join tuples. but we have (user, artist, rating) and (artist, id)
#so re-map sample data to have artist as index and user and rating as data i.e. (artist:(user,rating))
#join to the items_lkp it will match on the index of artist i.e. (artist:id)
#this will produce something of hte form artist:((user,rating),id)
#then re-map your output so that it is in the original form but now is (user, id, rating)
repArtist=sample.map(lambda (u,a,r):(a,(u,r))).join(items_lkp).map(lambda (a,((u,r),i)):(u,i,r))
#repeat to switch out the Users
repUser=repArtist.map(lambda (u,a,r):(u,(a,r))).join(users_lkp).map(lambda (u,((a,r),i)):(i,a,r))
##########################
### BUILDING THE MODEL ###
##########################
#import recs model from MLlib
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
#rank is the number of latent factors in the model.
rank = 20
#iterations is the number of iterations to run.
numIterations = 10
#generate the model
model = ALS.trainImplicit(repUser, rank, numIterations, 0.01)
#from the input data keep the user and the item to create a test set
testdata = repUser.map(lambda r: (r[0], r[1]))
#then use the model to generate rating predictions for the test set
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
predictions.top(5)
#join the original rating back to the prediction
ratesAndPreds = repUser.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
ratesAndPreds.top(5)
#then look at the accuracy of hte prediction by calulating an MSE
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).reduce(lambda x, y: x + y) / ratesAndPreds.count()
print("Mean Squared Error = " + str(MSE))
#Mean Squared Error = 3.87181792494e-05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment