Skip to content

Instantly share code, notes, and snippets.

@rezsa
Created November 10, 2014 20:55
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save rezsa/359714b3c9e0f554f878 to your computer and use it in GitHub Desktop.
Save rezsa/359714b3c9e0f554f878 to your computer and use it in GitHub Desktop.
Collaborative Filtering ALS Recommender System using Spark MLlib adapted from the Spark Summit 2014 Recommender System training example
#!/usr/bin/env python
import sys
import itertools
from math import sqrt
from operator import add
from os.path import join, isfile, dirname
from time import time
from pyspark import SparkConf, SparkContext
from pyspark.mllib.recommendation import ALS
def parseRating(line):
"""
Parses a rating record in MovieLens format userId[tab]movieId[tab]rating[tab]timestamp .
"""
fields = line.strip().split("\t")
return long(fields[3]) % 10, (int(fields[0]), int(fields[1]), float(fields[2]))
def loadRatings(ratingsFile):
"""
Load ratings from file.
"""
if not isfile(ratingsFile):
print "File %s does not exist." % ratingsFile
sys.exit(1)
f = open(ratingsFile, 'r')
ratings = filter(lambda r: r[2] > 0, [parseRating(line)[1] for line in f])
f.close()
if not ratings:
print "No ratings provided."
sys.exit(1)
else:
return ratings
def computeRmse(model, data, n):
"""
Compute RMSE (Root Mean Squared Error).
"""
predictions = model.predictAll(data.map(lambda x: (x[0], x[1])))
predictionsAndRatings = predictions.map(lambda x: ((x[0], x[1]), x[2])) \
.join(data.map(lambda x: ((x[0], x[1]), x[2]))) \
.values()
return sqrt(predictionsAndRatings.map(lambda x: (x[0] - x[1]) ** 2).reduce(add) / float(n))
def main():
if (len(sys.argv) != 2):
print "Usage: /path/to/spark/bin/spark-submit --driver-memory 2g " + \
"python-file-name.py /path/to/training-data-file"
sys.exit(1)
startTime = time()
conf = SparkConf().setMaster("local").setAppName("movieRecommender").set("spark.executor.memory", "2g")
sc = SparkContext(conf=conf)
# ratings is an RDD of (last digit of timestamp, (userId, movieId, rating))
trainingDataFile = sys.argv[1]
ratings = sc.textFile(trainingDataFile).map(parseRating)
numRatings = ratings.count()
numUsers = ratings.values().map(lambda r: r[0]).distinct().count()
numMovies = ratings.values().map(lambda r: r[1]).distinct().count()
print "Got %d ratings from %d users on %d movies." % (numRatings, numUsers, numMovies)
numPartitions = 4
training = ratings.filter(lambda x: x[0] < 6) \
.values() \
.repartition(numPartitions) \
.cache()
validation = ratings.filter(lambda x: x[0] >= 6 and x[0] < 8) \
.values() \
.repartition(numPartitions) \
.cache()
test = ratings.filter(lambda x: x[0] >= 8).values().cache()
numTraining = training.count()
numValidation = validation.count()
numTest = test.count()
print "Training: %d, validation: %d, test: %d" % (numTraining, numValidation, numTest)
ranks = [6, 8, 12]
lambdas = [0.1, 1.0, 10.0]
numIters = [10, 20]
bestModel = None
bestValidationRmse = float("inf")
bestRank = 0
bestLambda = -1.0
bestNumIter = -1
for rank, lmbda, numIter in itertools.product(ranks, lambdas, numIters):
model = ALS.train(training, rank, numIter, lmbda)
validationRmse = computeRmse(model, validation, numValidation)
print "RMSE (validation) = %f for the model trained with " % validationRmse + \
"rank = %d, lambda = %.1f, and numIter = %d." % (rank, lmbda, numIter)
if (validationRmse < bestValidationRmse):
bestModel = model
bestValidationRmse = validationRmse
bestRank = rank
bestLambda = lmbda
bestNumIter = numIter
# evaluate the best model on the test set
testRmse = computeRmse(bestModel, test, numTest)
sc.stop()
endTime = time()
programTime = endTime - startTime
print "Got %d ratings from %d users on %d movies." % (numRatings, numUsers, numMovies)
print "Training: %d, validation: %d, test: %d" % (numTraining, numValidation, numTest)
print "The best model was trained with rank = %d and lambda = %.1f, " % (bestRank, bestLambda) \
+ "and numIter = %d, and its RMSE on the test set is %f." % (bestNumIter, testRmse)
print "Time for program to complete: %.1f s" %programTime
if __name__ == "__main__":
main()
@rezsa
Copy link
Author

rezsa commented Nov 10, 2014

to run:
$path/to/sparkdirectory/bin/spark-submit recommender_spark.py /path/to/ratings-training-data-file
e.g.:
$~/Applications/spark-1.1.0-bin-hadoop2.4/bin/spark-submit recommender_spark.py u1.base

My post on Recommender Systems and explanation of this code:
http://rezf.blogspot.ca/2014/11/building-big-data-machine-learning_10.html

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment