Skip to content

Instantly share code, notes, and snippets.

@invkrh
Last active November 8, 2016 14:50
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save invkrh/05a83be081c1f713e15b to your computer and use it in GitHub Desktop.
Save invkrh/05a83be081c1f713e15b to your computer and use it in GitHub Desktop.
MLLIB ALS evaluation using Mean Average Precision (MAP) and Expected Percentile Rank (EPR)
def meanAveragedPrecision(k: Int, model: MatrixFactorizationModel, ratings: RDD[Rating]): Double = {
def avgPrecisionK(actual: Seq[Int], predicted: Seq[Int], k: Int): Double = {
val predK = predicted.take(k)
var score = 0.0
var numHits = 0.0
for ((p, i) <- predK.zipWithIndex) {
if (actual.contains(p) && !predK.take(i).contains(p)) {
numHits += 1.0
score += numHits / (i.toDouble + 1.0)
}
}
if (actual.isEmpty) {
// if no positive behavior is observed, we suppose none of prediction is good
0d
} else {
score / scala.math.min(actual.size, k).toDouble
}
}
val itemFactors = model.productFeatures.collect()
val itemMatrix = new DoubleMatrix(itemFactors.map(_._2))
val imBroadCast = sc.broadcast(itemMatrix)
val allRecs = model.userFeatures.map {
case (userId, array) =>
val userVector = new DoubleMatrix(array)
val scores = imBroadCast.value.mmul(userVector)
val sortedWithId = scores.data.zipWithIndex.sortBy(-_._1)
val recommendedId = sortedWithId.map(_._2).toSeq.take(k)
(userId, recommendedId)
}
val userMovies = ratings.groupBy(_.user).map {
case (user, ratingList) => (user, ratingList.map(_.product).toSeq)
}
val mapk = allRecs.join(userMovies).map {
case (userId, (predicted, actual)) =>
avgPrecisionK(actual, predicted, k)
}.reduce(_ + _) / allRecs.count
mapk
}
def expectedPercentileRanking(model: MatrixFactorizationModel, ratings: RDD[Rating]) = {
val itemFactors = model.productFeatures.collect()
val itemMatrix = new DoubleMatrix(itemFactors.map(_._2))
val imBroadCast = sc.broadcast(itemMatrix)
val itemListPerUser = ratings.groupBy(_.user).map {
case (user, ratingList) => (user, ratingList.map(rt => (rt.product, rt.rating)).toArray)
}
val rankRDD = model.userFeatures.join(itemListPerUser).map {
case (userId, (userFeatures, itemRatingList)) =>
val userVector = new DoubleMatrix(userFeatures)
val scores = imBroadCast.value.mmul(userVector)
val sortedWithId = scores.data.zipWithIndex.sortBy(-_._1)
val itemsOrderedByPref = sortedWithId.map(_._2).toSeq
val rankWeightedByRating = itemRatingList.map {
case (itemId, rating) =>
rating * itemsOrderedByPref.indexOf(itemId).toDouble / (itemsOrderedByPref.size - 1)
}.sum
rankWeightedByRating
}
val weightedRankOverAll = rankRDD.sum
val sumWeight = ratings.map(_.rating).sum()
weightedRankOverAll / sumWeight
}
val ratings_train: RDD[Rating] = loadImplicitCFDataSet("data/movieLens/t_als_train_pos.csv").map {
case Rating(user, item, rt) => Rating(user, item, rt - 2.5)
}.filter(_.rating > 0)
val ratings_test: RDD[Rating] = loadImplicitCFDataSet("data/movieLens/t_als_test_pos.csv").map {
case Rating(user, item, rt) => Rating(user, item, rt - 2.5)
}.filter(_.rating > 0)
val model = new ALS()
.setImplicitPrefs(implicitPrefs = true)
.setAlpha(50)
.setLambda(0.01)
.setRank(50)
.setIterations(30)
.setBlocks(8)
.setSeed(42)
.run(ratings_train)
val mapk_in = meanAveragedPrecision(10, model, ratings_train)
val mapk_out = meanAveragedPrecision(10, model, ratings_test)
val rank_in = expectedPercentileRanking(model, ratings_train)
val rank_out = expectedPercentileRanking(model, ratings_test)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment