local version of the lastfm recommendations in spark
#start a terminal at the folder where spark is installed | |
#in the command line run this to fire up a pyspark instance | |
./bin/pyspark | |
########################### | |
### LOADING IN THE DATA ### | |
########################### | |
#load in the file and examine | |
lines = sc.textFile('usersha1-artmbid-artname-plays.tsv') | |
type(lines) | |
lines.count() | |
#17,559,530 | |
#split each line of the file by the tab delimiter and check first line | |
data = lines.map(lambda l: l.split('\t')) | |
data.first() | |
#keep the relevent columns user, artist and plays as rating and check first line | |
ratings = data.map(lambda d: (d[0], d[2], 1)) | |
ratings.first() | |
#file is too big to process locally need to reduce. | |
sample = ratings.sample(withReplacement=False, fraction=0.001, seed=123456789) | |
sample.count() | |
#1,756,476 | |
########################################### | |
### REFORMATTING DATA TO SUIT MLLIB ALS ### | |
########################################### | |
#mllib als requires users and items to be integers create lookups | |
users = sample.map(lambda s: s[0]).distinct() | |
users.count() | |
#355,756 | |
#create unique id | |
users_lkp=users.zipWithUniqueId() | |
users_lkp.top(5) | |
items_lkp = sample.map(lambda s: s[1]).distinct().zipWithUniqueId() | |
items_lkp.count() | |
#104,677 | |
#now we need to replace the user and artist with the id's in the lookup | |
#this took some thinking. you can only really join tuples. but we have (user, artist, rating) and (artist, id) | |
#so re-map sample data to have artist as index and user and rating as data i.e. (artist:(user,rating)) | |
#join to the items_lkp it will match on the index of artist i.e. (artist:id) | |
#this will produce something of hte form artist:((user,rating),id) | |
#then re-map your output so that it is in the original form but now is (user, id, rating) | |
repArtist=sample.map(lambda (u,a,r):(a,(u,r))).join(items_lkp).map(lambda (a,((u,r),i)):(u,i,r)) | |
#repeat to switch out the Users | |
repUser=repArtist.map(lambda (u,a,r):(u,(a,r))).join(users_lkp).map(lambda (u,((a,r),i)):(i,a,r)) | |
########################## | |
### BUILDING THE MODEL ### | |
########################## | |
#import recs model from MLlib | |
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating | |
#rank is the number of latent factors in the model. | |
rank = 20 | |
#iterations is the number of iterations to run. | |
numIterations = 10 | |
#generate the model | |
model = ALS.trainImplicit(repUser, rank, numIterations, 0.01) | |
#from the input data keep the user and the item to create a test set | |
testdata = repUser.map(lambda r: (r[0], r[1])) | |
#then use the model to generate rating predictions for the test set | |
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2])) | |
predictions.top(5) | |
#join the original rating back to the prediction | |
ratesAndPreds = repUser.map(lambda r: ((r[0], r[1]), r[2])).join(predictions) | |
ratesAndPreds.top(5) | |
#then look at the accuracy of hte prediction by calulating an MSE | |
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).reduce(lambda x, y: x + y) / ratesAndPreds.count() | |
print("Mean Squared Error = " + str(MSE)) | |
#Mean Squared Error = 3.87181792494e-05 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment