Collaborative Filtering ALS Recommender System using Spark MLlib adapted from the Spark Summit 2014 Recommender System training example
#!/usr/bin/env python | |
import sys | |
import itertools | |
from math import sqrt | |
from operator import add | |
from os.path import join, isfile, dirname | |
from time import time | |
from pyspark import SparkConf, SparkContext | |
from pyspark.mllib.recommendation import ALS | |
def parseRating(line): | |
""" | |
Parses a rating record in MovieLens format userId[tab]movieId[tab]rating[tab]timestamp . | |
""" | |
fields = line.strip().split("\t") | |
return long(fields[3]) % 10, (int(fields[0]), int(fields[1]), float(fields[2])) | |
def loadRatings(ratingsFile): | |
""" | |
Load ratings from file. | |
""" | |
if not isfile(ratingsFile): | |
print "File %s does not exist." % ratingsFile | |
sys.exit(1) | |
f = open(ratingsFile, 'r') | |
ratings = filter(lambda r: r[2] > 0, [parseRating(line)[1] for line in f]) | |
f.close() | |
if not ratings: | |
print "No ratings provided." | |
sys.exit(1) | |
else: | |
return ratings | |
def computeRmse(model, data, n): | |
""" | |
Compute RMSE (Root Mean Squared Error). | |
""" | |
predictions = model.predictAll(data.map(lambda x: (x[0], x[1]))) | |
predictionsAndRatings = predictions.map(lambda x: ((x[0], x[1]), x[2])) \ | |
.join(data.map(lambda x: ((x[0], x[1]), x[2]))) \ | |
.values() | |
return sqrt(predictionsAndRatings.map(lambda x: (x[0] - x[1]) ** 2).reduce(add) / float(n)) | |
def main(): | |
if (len(sys.argv) != 2): | |
print "Usage: /path/to/spark/bin/spark-submit --driver-memory 2g " + \ | |
"python-file-name.py /path/to/training-data-file" | |
sys.exit(1) | |
startTime = time() | |
conf = SparkConf().setMaster("local").setAppName("movieRecommender").set("spark.executor.memory", "2g") | |
sc = SparkContext(conf=conf) | |
# ratings is an RDD of (last digit of timestamp, (userId, movieId, rating)) | |
trainingDataFile = sys.argv[1] | |
ratings = sc.textFile(trainingDataFile).map(parseRating) | |
numRatings = ratings.count() | |
numUsers = ratings.values().map(lambda r: r[0]).distinct().count() | |
numMovies = ratings.values().map(lambda r: r[1]).distinct().count() | |
print "Got %d ratings from %d users on %d movies." % (numRatings, numUsers, numMovies) | |
numPartitions = 4 | |
training = ratings.filter(lambda x: x[0] < 6) \ | |
.values() \ | |
.repartition(numPartitions) \ | |
.cache() | |
validation = ratings.filter(lambda x: x[0] >= 6 and x[0] < 8) \ | |
.values() \ | |
.repartition(numPartitions) \ | |
.cache() | |
test = ratings.filter(lambda x: x[0] >= 8).values().cache() | |
numTraining = training.count() | |
numValidation = validation.count() | |
numTest = test.count() | |
print "Training: %d, validation: %d, test: %d" % (numTraining, numValidation, numTest) | |
ranks = [6, 8, 12] | |
lambdas = [0.1, 1.0, 10.0] | |
numIters = [10, 20] | |
bestModel = None | |
bestValidationRmse = float("inf") | |
bestRank = 0 | |
bestLambda = -1.0 | |
bestNumIter = -1 | |
for rank, lmbda, numIter in itertools.product(ranks, lambdas, numIters): | |
model = ALS.train(training, rank, numIter, lmbda) | |
validationRmse = computeRmse(model, validation, numValidation) | |
print "RMSE (validation) = %f for the model trained with " % validationRmse + \ | |
"rank = %d, lambda = %.1f, and numIter = %d." % (rank, lmbda, numIter) | |
if (validationRmse < bestValidationRmse): | |
bestModel = model | |
bestValidationRmse = validationRmse | |
bestRank = rank | |
bestLambda = lmbda | |
bestNumIter = numIter | |
# evaluate the best model on the test set | |
testRmse = computeRmse(bestModel, test, numTest) | |
sc.stop() | |
endTime = time() | |
programTime = endTime - startTime | |
print "Got %d ratings from %d users on %d movies." % (numRatings, numUsers, numMovies) | |
print "Training: %d, validation: %d, test: %d" % (numTraining, numValidation, numTest) | |
print "The best model was trained with rank = %d and lambda = %.1f, " % (bestRank, bestLambda) \ | |
+ "and numIter = %d, and its RMSE on the test set is %f." % (bestNumIter, testRmse) | |
print "Time for program to complete: %.1f s" %programTime | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This comment has been minimized.
to run:
$path/to/sparkdirectory/bin/spark-submit recommender_spark.py /path/to/ratings-training-data-file
e.g.:
$~/Applications/spark-1.1.0-bin-hadoop2.4/bin/spark-submit recommender_spark.py u1.base
My post on Recommender Systems and explanation of this code:
http://rezf.blogspot.ca/2014/11/building-big-data-machine-learning_10.html