Last active
March 31, 2022 22:43
-
-
Save rzykov/a8edc950934a6ecb1d7e6c1457efd304 to your computer and use it in GitHub Desktop.
XGboost Spark - ranking problem
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import _root_.ml.dmlc.xgboost4j.scala.spark.XGBoost | |
import org.apache.spark.ml.feature.LabeledPoint | |
def encodeFeaturesToLabeledPoint(features: RDD[Feature], relevance: Option[RDD[Relevance]], workers: Int) | |
(implicit parallel: Int): (RDD[LabeledPoint], Seq[String], Seq[Seq[Int]]) = { | |
val missingValue = Double.NaN | |
val names = features | |
.map { _.name } | |
.distinct(parallel) | |
.collect() | |
.sorted | |
.toSeq | |
println(s"load prop names ${names.size}") | |
names.zipWithIndex.map { case (v, i) => (i + 1, v) }.foreach(println) | |
val itemFeatures = features | |
.map { p => ((p.itemIdA, p.itemIdB), (p.name, p.value)) } | |
.groupByKey(parallel) | |
val featuresPrepared: RDD[(ItemId, ItemId, LabeledPoint)] = | |
if(relevance.isDefined) | |
itemFeatures | |
.join(relevance.get.map { r => ((r.itemIdA, r.itemIdB), r.rate) }, parallel) | |
.map { case ((itemIdA, itemIdB), (ps, rate)) => | |
(itemIdA, itemIdB, LabeledPoint( rate, Vectors.dense(valuesToFeatureArrayDouble(names, ps.toMap, missingValue))))} | |
else | |
itemFeatures | |
.map { case ((itemIdA, itemIdB), ps) => (itemIdA, itemIdB, LabeledPoint( missingValue, Vectors.dense(valuesToFeatureArrayDouble(names, ps.toMap, missingValue)))) } | |
// Very important about preparing data for ranking https://github.com/dmlc/xgboost/issues/2223 | |
/* | |
Providing a Seq[Seq[Int]] of group data in the XGBoost::trainWithDataFrame function makes it very easy for a user to shoot themselves in the foot. If the number of workers selected isn't the same as the number of partitions, | |
then it will repartition the dataframe. Once the dataframe is repartitioned the provided group data is no longer valid. A more resilient solution might be to accept a group column. | |
If not possible, at least some documentation on how to generate this group data would also be useful. Currently my process is roughly: | |
1) Convert my dataframe into a PairRDD[(String, Row)] with the group identifier as the key | |
2) repartition by key, sort within partitions so groups are sequential | |
3) convert PairRDD back into RDD[Row] then into DataFrame | |
4) map over partitions with the partition index, creating a new RDD[(Int, Int)] of (partition id, group length) pairs. | |
5) Collect that rdd and build up the Seq[Seq[Int]] | |
*/ | |
val featuresSorted = featuresPrepared | |
.map { case(itemIdA, itemIdB, vector) => (itemIdA.toLong, vector) } | |
.repartitionAndSortWithinPartitions(new HashPartitioner(workers)) | |
println("partitions =" + featuresSorted.getNumPartitions) | |
println("partitions =" + featuresSorted.partitions.mkString("\n")) | |
def groupReducerWithPreservingOrdering(left: Seq[(Long, Int)], right: Seq[(Long, Int)]): Seq[(Long, Int)] = { | |
if(left.last._1 == right.head._1) left.init :+ (left.last._1, left.last._2 + right.last._2) | |
else left ++ right | |
} | |
def partitionReducerWithPreservingOrdering(left: Seq[(Int, Seq[Int])], right: Seq[(Int, Seq[Int])]): Seq[(Int, Seq[Int])] = { | |
if(left.last._1 == right.head._1) left.init :+ (left.last._1, left.last._2 ++ right.last._2) | |
else left ++ right | |
} | |
def countGroups(partitionId: Int, vectors: Iterator[(Long, LabeledPoint)]): Iterator[(Int, Int)] = | |
vectors | |
.toTraversable | |
.map { case (itemIdA, _) => Seq((itemIdA, 1)) } | |
.reduceLeft(groupReducerWithPreservingOrdering) | |
.map { case (itemIdA, cnt) => (partitionId, cnt)} | |
.toIterator | |
def countGroups2(partitionId: Int, vectors: Iterator[(Long, LabeledPoint)]): Iterator[(Int,Int)] = | |
Seq((partitionId, vectors | |
.size)) | |
.toIterator | |
featuresSorted | |
.mapPartitionsWithIndex(countGroups2, true) | |
.collect() | |
.toSeq | |
.foreach(println) | |
val groups = featuresSorted | |
.mapPartitionsWithIndex(countGroups, true) | |
.collect() | |
.toSeq | |
.map{ case(partitionId: Int, cnt: Int) => Seq((partitionId, Seq(cnt))) } | |
.reduce(partitionReducerWithPreservingOrdering) | |
.map{ case(partition, lengths) => lengths} | |
(featuresSorted.map{ case(itemId, vector) => vector} , names, groups) | |
} | |
case class Feature(itemIdA: ItemId, itemIdB: ItemId, name: String, value: Double) { | |
override def toString = "%s\t%s\t%s\t%.12f".format(itemIdA, itemIdB, name, value) | |
} | |
case class Relevance(itemIdA: ItemId, itemIdB: ItemId, rateRaw: Float) { | |
def rate: Int = math.rint(rateRaw).toInt | |
} | |
val (featuresTrainLP, featureNames, partitionedGroups)= encodeFeaturesToLabeledPoint(featuresTrain, Option(relevance), args.workers()) | |
val paramMap = List( | |
"eta" -> 0.023f, | |
"groupData" -> partitionedGroups, | |
"max_depth" -> 10, | |
"min_child_weight" -> 3.0, | |
"subsample" -> 1.0, | |
"colsample_bytree" -> 0.82, | |
"colsample_bylevel" -> 0.9, | |
"base_score" -> 0.005, | |
"eval_metric" -> "ndcg@5", | |
"seed" -> 49, | |
"silent" -> 1, | |
"objective" -> "rank:pairwise").toMap | |
val xgBoostModel = XGBoost.trainWithRDD(featuresTrainLP, paramMap, round = args.iterations(), nWorkers = args.workers()) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi Roman, do you have a full working example that uses this Gist available? This gist is missing the implementation of
valuesToFeatureArrayDouble
function, and I wonder how does it look like?