Last active March 31, 2022 22:43
XGboost Spark - ranking problem
def encodeFeaturesToLabeledPoint(features: RDD[Feature], relevance: Option[RDD[Relevance]], workers: Int)
(implicit parallel: Int): (RDD[LabeledPoint], Seq[String], Seq[Seq[Int]]) = {
val missingValue = Double.NaN
val names = features
.map { }
println(s"load prop names ${names.size}") { case (v, i) => (i + 1, v) }.foreach(println)
val itemFeatures = features
.map { p => ((p.itemIdA, p.itemIdB), (, p.value)) }
val featuresPrepared: RDD[(ItemId, ItemId, LabeledPoint)] =
.join( { r => ((r.itemIdA, r.itemIdB), r.rate) }, parallel)
.map { case ((itemIdA, itemIdB), (ps, rate)) =>
(itemIdA, itemIdB, LabeledPoint( rate, Vectors.dense(valuesToFeatureArrayDouble(names, ps.toMap, missingValue))))}
.map { case ((itemIdA, itemIdB), ps) => (itemIdA, itemIdB, LabeledPoint( missingValue, Vectors.dense(valuesToFeatureArrayDouble(names, ps.toMap, missingValue)))) }
// Very important about preparing data for ranking
Providing a Seq[Seq[Int]] of group data in the XGBoost::trainWithDataFrame function makes it very easy for a user to shoot themselves in the foot. If the number of workers selected isn't the same as the number of partitions,
then it will repartition the dataframe. Once the dataframe is repartitioned the provided group data is no longer valid. A more resilient solution might be to accept a group column.
If not possible, at least some documentation on how to generate this group data would also be useful. Currently my process is roughly:
1) Convert my dataframe into a PairRDD[(String, Row)] with the group identifier as the key
2) repartition by key, sort within partitions so groups are sequential
3) convert PairRDD back into RDD[Row] then into DataFrame
4) map over partitions with the partition index, creating a new RDD[(Int, Int)] of (partition id, group length) pairs.
5) Collect that rdd and build up the Seq[Seq[Int]]
val featuresSorted = featuresPrepared
.map { case(itemIdA, itemIdB, vector) => (itemIdA.toLong, vector) }
.repartitionAndSortWithinPartitions(new HashPartitioner(workers))
println("partitions =" + featuresSorted.getNumPartitions)
println("partitions =" + featuresSorted.partitions.mkString("\n"))
def groupReducerWithPreservingOrdering(left: Seq[(Long, Int)], right: Seq[(Long, Int)]): Seq[(Long, Int)] = {
if(left.last._1 == right.head._1) left.init :+ (left.last._1, left.last._2 + right.last._2)
else left ++ right
def partitionReducerWithPreservingOrdering(left: Seq[(Int, Seq[Int])], right: Seq[(Int, Seq[Int])]): Seq[(Int, Seq[Int])] = {
if(left.last._1 == right.head._1) left.init :+ (left.last._1, left.last._2 ++ right.last._2)
else left ++ right
def countGroups(partitionId: Int, vectors: Iterator[(Long, LabeledPoint)]): Iterator[(Int, Int)] =
.map { case (itemIdA, _) => Seq((itemIdA, 1)) }
.map { case (itemIdA, cnt) => (partitionId, cnt)}
def countGroups2(partitionId: Int, vectors: Iterator[(Long, LabeledPoint)]): Iterator[(Int,Int)] =
Seq((partitionId, vectors
.mapPartitionsWithIndex(countGroups2, true)
val groups = featuresSorted
.mapPartitionsWithIndex(countGroups, true)
.map{ case(partitionId: Int, cnt: Int) => Seq((partitionId, Seq(cnt))) }
.map{ case(partition, lengths) => lengths}
({ case(itemId, vector) => vector} , names, groups)
case class Feature(itemIdA: ItemId, itemIdB: ItemId, name: String, value: Double) {
override def toString = "%s\t%s\t%s\t%.12f".format(itemIdA, itemIdB, name, value)
case class Relevance(itemIdA: ItemId, itemIdB: ItemId, rateRaw: Float) {
def rate: Int = math.rint(rateRaw).toInt
val (featuresTrainLP, featureNames, partitionedGroups)= encodeFeaturesToLabeledPoint(featuresTrain, Option(relevance), args.workers())
val paramMap = List(
"eta" -> 0.023f,
"groupData" -> partitionedGroups,
"max_depth" -> 10,
"min_child_weight" -> 3.0,
"subsample" -> 1.0,
"colsample_bytree" -> 0.82,
"colsample_bylevel" -> 0.9,
"base_score" -> 0.005,
"eval_metric" -> "ndcg@5",
"seed" -> 49,
"silent" -> 1,
"objective" -> "rank:pairwise").toMap
val xgBoostModel = XGBoost.trainWithRDD(featuresTrainLP, paramMap, round = args.iterations(), nWorkers = args.workers())
Hi Roman, do you have a full working example that uses this Gist available? This gist is missing the implementation of valuesToFeatureArrayDouble function, and I wonder how does it look like?

