Skip to content

Instantly share code, notes, and snippets.

> db.personal_ratings.find()
{ "_id" : ObjectId("57226a50a45eff77e4dc3fce"), "user_id" : "0", "movie_id" : "1", "rating" : "4" }
{ "_id" : ObjectId("57226a50a45eff77e4dc3fcf"), "user_id" : "0", "movie_id" : "2", "rating" : "4" }
{ "_id" : ObjectId("57226a50a45eff77e4dc3fd0"), "user_id" : "0", "movie_id" : "16", "rating" : "5" }
{ "_id" : ObjectId("57226a50a45eff77e4dc3fd1"), "user_id" : "0", "movie_id" : "19", "rating" : "3" }
{ "_id" : ObjectId("57226a50a45eff77e4dc3fd2"), "user_id" : "0", "movie_id" : "47", "rating" : "4" }
{ "_id" : ObjectId("57226a50a45eff77e4dc3fd3"), "user_id" : "0", "movie_id" : "70", "rating" : "4" }
{ "_id" : ObjectId("57226a50a45eff77e4dc3fd4"), "user_id" : "0", "movie_id" : "163", "rating" : "5" }
{ "_id" : ObjectId("57226a50a45eff77e4dc3fd5"), "user_id" : "0", "movie_id" : "173", "rating" : "1" }
{ "_id" : ObjectId("57226a50a45eff77e4dc3fd6"), "user_id" : "0", "movie_id" : "356", "rating" : "5" }
$ ./submit-scala.sh
sc.stop()
// Save to MongoDB
MongoSpark.save(userRecommendations.write.mode("overwrite"), writeConfig)
// Get user recommendations
import sqlContext.implicits._
val unratedMovies = movieRatings.filter(s"user_id != $userId").select("movie_id").distinct().map(r =>
(userId, r.getAs[Int]("movie_id"))).toDF("user_id", "movie_id")
val recommendations = combinedModel.transform(unratedMovies)
// Convert the recommendations into UserMovieRatings
val userRecommendations = recommendations.map(r =>
UserMovieRating(0, r.getAs[Int]("movie_id"), r.getAs[Float]("prediction").toInt)).toDF()
// Combine the datasets
val userRatings = MongoSpark.load(sc, readConfig.copy(collectionName = "personal_ratings")).toDF[UserMovieRating]
val combinedRatings = movieRatings.unionAll(userRatings)
// Retrain using the combinedRatings
val combinedModel = als.fit(combinedRatings, bestModel.extractParamMap())
// Calculating the best model
val bestModel = trainedAndValidatedModel.fit(movieRatings)
val trainedAndValidatedModel = new TrainValidationSplit()
.setEstimator(als)
.setEvaluator(new RegressionEvaluator().setMetricName("rmse").setLabelCol("rating").setPredictionCol("prediction"))
.setEstimatorParamMaps(paramGrid)
.setTrainRatio(0.8)
// We use a ParamGridBuilder to construct a grid of parameters to search over.
// TrainValidationSplit will try all combinations of values and determine best model using the ALS evaluator.
val paramGrid = new ParamGridBuilder()
.addGrid(als.regParam, Array(0.1, 10.0))
.addGrid(als.rank, Array(8, 10))
.addGrid(als.maxIter, Array(10, 20))
.build()
// Create the ALS instance and map the movie data
val als = new ALS()
.setCheckpointInterval(2)
.setUserCol("user_id")
.setItemCol("movie_id")
.setRatingCol("rating")