Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
MovieRecommenderALS
//Download a 10 Millions movieLens file to test your data. http://grouplens.org/datasets/movielens/
// wget http://files.grouplens.org/datasets/movielens/ml-10m.zip
// unzip ml-10m.zip
import java.util.Random
import org.apache.log4j.Logger
import org.apache.log4j.Level
import scala.io.Source
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import org.apache.spark.mllib.recommendation.{ALS, Rating, MatrixFactorizationModel}
object MovieLensALS {
def main(args: Array[String]) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
if (args.length != 1) {
println("Usage: sbt/sbt package \"run movieLensHomeDir\"")
exit(1)
}
// set up environment /// Useless .
val jarFile = "target/scala-2.10/movielens-als_2.10-0.0.jar"
val sparkHome = "/root/spark"
val master = Source.fromFile("/root/spark-ec2/cluster-url").mkString.trim
val masterHostname = Source.fromFile("/root/spark-ec2/masters").mkString.trim
val conf = new SparkConf()
.setMaster(master)
.setSparkHome(sparkHome)
.setAppName("MovieLensALS")
.set("spark.executor.memory", "8g")
.setJars(Seq(jarFile))
val sc = new SparkContext(conf)
// load ratings and movie titles
val movieLensHomeDir = "hdfs://" + masterHostname + ":9000" + args(0) // Not needed fr now, you read the file locally
val movieLensHomeDir = ""
val ratings = sc.textFile(movieLensHomeDir + "/ratings.dat").map { line =>
val fields = line.split("::")
// format: (timestamp % 10, Rating(userId, movieId, rating))
(fields(3).toLong % 10, Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble))
}
val movies = sc.textFile(movieLensHomeDir + "/movies.dat").map { line =>
val fields = line.split("::")
// format: (movieId, movieName)
(fields(0).toInt, fields(1))
}.collect.toMap
// your code here
val numRatings = ratings.count
val numUsers = ratings.map(_._2.user).distinct.count
val numMovies = ratings.map(_._2.product).distinct.count
println("Got " + numRatings + " ratings from "
+ numUsers + " users on " + numMovies + " movies.")
val mostRatedMovieIds = ratings.map(_._2.product) // extract movie ids
.countByValue // count ratings per movie
.toSeq // convert map to Seq
.sortBy(- _._2) // sort by rating count
.take(50) // take 50 most rated
.map(_._1) // get their ids
val random = new Random(0)
val selectedMovies = mostRatedMovieIds.filter(x => random.nextDouble() < 0.2)
.map(x => (x, movies(x)))
.toSeq
val myRatings = elicitateRatings(selectedMovies)
val myRatingsRDD = sc.parallelize(myRatings)
val numPartitions = 20
val training = ratings.filter(x => x._1 < 6)
.values
.union(myRatingsRDD)
.repartition(numPartitions)
.persist
val validation = ratings.filter(x => x._1 >= 6 && x._1 < 8)
.values
.repartition(numPartitions)
.persist
val test = ratings.filter(x => x._1 >= 8).values.persist
val numTraining = training.count
val numValidation = validation.count
val numTest = test.count
println("Training: " + numTraining + ", validation: " + numValidation + ", test: " + numTest)
val ranks = List(8, 12)
val lambdas = List(0.1, 10.0)
val numIters = List(10, 20)
var bestModel: Option[MatrixFactorizationModel] = None
var bestValidationRmse = Double.MaxValue
var bestRank = 0
var bestLambda = -1.0
var bestNumIter = -1
for (rank <- ranks; lambda <- lambdas; numIter <- numIters) {
val model = ALS.train(training, rank, numIter, lambda)
val validationRmse = computeRmse(model, validation, numValidation)
println("RMSE (validation) = " + validationRmse + " for the model trained with rank = "
+ rank + ", lambda = " + lambda + ", and numIter = " + numIter + ".")
if (validationRmse < bestValidationRmse) {
bestModel = Some(model)
bestValidationRmse = validationRmse
bestRank = rank
bestLambda = lambda
bestNumIter = numIter
}
}
val testRmse = computeRmse(bestModel.get, test, numTest)
println("The best model was trained with rank = " + bestRank + " and lambda = " + bestLambda
+ ", and numIter = " + bestNumIter + ", and its RMSE on the test set is " + testRmse + ".")
val myRatedMovieIds = myRatings.map(_.product).toSet
val candidates = sc.parallelize(movies.keys.filter(!myRatedMovieIds.contains(_)).toSeq)
val recommendations = bestModel.get
.predict(candidates.map((0, _)))
.collect
.sortBy(-_.rating)
.take(50)
var i = 1
println("Movies recommended for you:")
recommendations.foreach { r =>
println("%2d".format(i) + ": " + movies(r.product))
i += 1
}
// clean up
sc.stop();
}
/** Compute RMSE (Root Mean Squared Error). */
def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], n: Long) = {
// ...
}
/** Elicitate ratings from command-line. */
def elicitateRatings(movies: Seq[(Int, String)]) = {
// ...
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.