Created
November 10, 2014 20:55
-
-
Save rezsa/359714b3c9e0f554f878 to your computer and use it in GitHub Desktop.
Collaborative Filtering ALS Recommender System using Spark MLlib adapted from the Spark Summit 2014 Recommender System training example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import sys | |
import itertools | |
from math import sqrt | |
from operator import add | |
from os.path import join, isfile, dirname | |
from time import time | |
from pyspark import SparkConf, SparkContext | |
from pyspark.mllib.recommendation import ALS | |
def parseRating(line): | |
""" | |
Parses a rating record in MovieLens format userId[tab]movieId[tab]rating[tab]timestamp . | |
""" | |
fields = line.strip().split("\t") | |
return long(fields[3]) % 10, (int(fields[0]), int(fields[1]), float(fields[2])) | |
def loadRatings(ratingsFile): | |
""" | |
Load ratings from file. | |
""" | |
if not isfile(ratingsFile): | |
print "File %s does not exist." % ratingsFile | |
sys.exit(1) | |
f = open(ratingsFile, 'r') | |
ratings = filter(lambda r: r[2] > 0, [parseRating(line)[1] for line in f]) | |
f.close() | |
if not ratings: | |
print "No ratings provided." | |
sys.exit(1) | |
else: | |
return ratings | |
def computeRmse(model, data, n): | |
""" | |
Compute RMSE (Root Mean Squared Error). | |
""" | |
predictions = model.predictAll(data.map(lambda x: (x[0], x[1]))) | |
predictionsAndRatings = predictions.map(lambda x: ((x[0], x[1]), x[2])) \ | |
.join(data.map(lambda x: ((x[0], x[1]), x[2]))) \ | |
.values() | |
return sqrt(predictionsAndRatings.map(lambda x: (x[0] - x[1]) ** 2).reduce(add) / float(n)) | |
def main(): | |
if (len(sys.argv) != 2): | |
print "Usage: /path/to/spark/bin/spark-submit --driver-memory 2g " + \ | |
"python-file-name.py /path/to/training-data-file" | |
sys.exit(1) | |
startTime = time() | |
conf = SparkConf().setMaster("local").setAppName("movieRecommender").set("spark.executor.memory", "2g") | |
sc = SparkContext(conf=conf) | |
# ratings is an RDD of (last digit of timestamp, (userId, movieId, rating)) | |
trainingDataFile = sys.argv[1] | |
ratings = sc.textFile(trainingDataFile).map(parseRating) | |
numRatings = ratings.count() | |
numUsers = ratings.values().map(lambda r: r[0]).distinct().count() | |
numMovies = ratings.values().map(lambda r: r[1]).distinct().count() | |
print "Got %d ratings from %d users on %d movies." % (numRatings, numUsers, numMovies) | |
numPartitions = 4 | |
training = ratings.filter(lambda x: x[0] < 6) \ | |
.values() \ | |
.repartition(numPartitions) \ | |
.cache() | |
validation = ratings.filter(lambda x: x[0] >= 6 and x[0] < 8) \ | |
.values() \ | |
.repartition(numPartitions) \ | |
.cache() | |
test = ratings.filter(lambda x: x[0] >= 8).values().cache() | |
numTraining = training.count() | |
numValidation = validation.count() | |
numTest = test.count() | |
print "Training: %d, validation: %d, test: %d" % (numTraining, numValidation, numTest) | |
ranks = [6, 8, 12] | |
lambdas = [0.1, 1.0, 10.0] | |
numIters = [10, 20] | |
bestModel = None | |
bestValidationRmse = float("inf") | |
bestRank = 0 | |
bestLambda = -1.0 | |
bestNumIter = -1 | |
for rank, lmbda, numIter in itertools.product(ranks, lambdas, numIters): | |
model = ALS.train(training, rank, numIter, lmbda) | |
validationRmse = computeRmse(model, validation, numValidation) | |
print "RMSE (validation) = %f for the model trained with " % validationRmse + \ | |
"rank = %d, lambda = %.1f, and numIter = %d." % (rank, lmbda, numIter) | |
if (validationRmse < bestValidationRmse): | |
bestModel = model | |
bestValidationRmse = validationRmse | |
bestRank = rank | |
bestLambda = lmbda | |
bestNumIter = numIter | |
# evaluate the best model on the test set | |
testRmse = computeRmse(bestModel, test, numTest) | |
sc.stop() | |
endTime = time() | |
programTime = endTime - startTime | |
print "Got %d ratings from %d users on %d movies." % (numRatings, numUsers, numMovies) | |
print "Training: %d, validation: %d, test: %d" % (numTraining, numValidation, numTest) | |
print "The best model was trained with rank = %d and lambda = %.1f, " % (bestRank, bestLambda) \ | |
+ "and numIter = %d, and its RMSE on the test set is %f." % (bestNumIter, testRmse) | |
print "Time for program to complete: %.1f s" %programTime | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
to run:
$path/to/sparkdirectory/bin/spark-submit recommender_spark.py /path/to/ratings-training-data-file
e.g.:
$~/Applications/spark-1.1.0-bin-hadoop2.4/bin/spark-submit recommender_spark.py u1.base
My post on Recommender Systems and explanation of this code:
http://rezf.blogspot.ca/2014/11/building-big-data-machine-learning_10.html