Skip to content

Instantly share code, notes, and snippets.

@rzykov
Last active March 31, 2022 22:43
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save rzykov/a8edc950934a6ecb1d7e6c1457efd304 to your computer and use it in GitHub Desktop.
Save rzykov/a8edc950934a6ecb1d7e6c1457efd304 to your computer and use it in GitHub Desktop.
XGboost Spark - ranking problem
import _root_.ml.dmlc.xgboost4j.scala.spark.XGBoost
import org.apache.spark.ml.feature.LabeledPoint
def encodeFeaturesToLabeledPoint(features: RDD[Feature], relevance: Option[RDD[Relevance]], workers: Int)
(implicit parallel: Int): (RDD[LabeledPoint], Seq[String], Seq[Seq[Int]]) = {
val missingValue = Double.NaN
val names = features
.map { _.name }
.distinct(parallel)
.collect()
.sorted
.toSeq
println(s"load prop names ${names.size}")
names.zipWithIndex.map { case (v, i) => (i + 1, v) }.foreach(println)
val itemFeatures = features
.map { p => ((p.itemIdA, p.itemIdB), (p.name, p.value)) }
.groupByKey(parallel)
val featuresPrepared: RDD[(ItemId, ItemId, LabeledPoint)] =
if(relevance.isDefined)
itemFeatures
.join(relevance.get.map { r => ((r.itemIdA, r.itemIdB), r.rate) }, parallel)
.map { case ((itemIdA, itemIdB), (ps, rate)) =>
(itemIdA, itemIdB, LabeledPoint( rate, Vectors.dense(valuesToFeatureArrayDouble(names, ps.toMap, missingValue))))}
else
itemFeatures
.map { case ((itemIdA, itemIdB), ps) => (itemIdA, itemIdB, LabeledPoint( missingValue, Vectors.dense(valuesToFeatureArrayDouble(names, ps.toMap, missingValue)))) }
// Very important about preparing data for ranking https://github.com/dmlc/xgboost/issues/2223
/*
Providing a Seq[Seq[Int]] of group data in the XGBoost::trainWithDataFrame function makes it very easy for a user to shoot themselves in the foot. If the number of workers selected isn't the same as the number of partitions,
then it will repartition the dataframe. Once the dataframe is repartitioned the provided group data is no longer valid. A more resilient solution might be to accept a group column.
If not possible, at least some documentation on how to generate this group data would also be useful. Currently my process is roughly:
1) Convert my dataframe into a PairRDD[(String, Row)] with the group identifier as the key
2) repartition by key, sort within partitions so groups are sequential
3) convert PairRDD back into RDD[Row] then into DataFrame
4) map over partitions with the partition index, creating a new RDD[(Int, Int)] of (partition id, group length) pairs.
5) Collect that rdd and build up the Seq[Seq[Int]]
*/
val featuresSorted = featuresPrepared
.map { case(itemIdA, itemIdB, vector) => (itemIdA.toLong, vector) }
.repartitionAndSortWithinPartitions(new HashPartitioner(workers))
println("partitions =" + featuresSorted.getNumPartitions)
println("partitions =" + featuresSorted.partitions.mkString("\n"))
def groupReducerWithPreservingOrdering(left: Seq[(Long, Int)], right: Seq[(Long, Int)]): Seq[(Long, Int)] = {
if(left.last._1 == right.head._1) left.init :+ (left.last._1, left.last._2 + right.last._2)
else left ++ right
}
def partitionReducerWithPreservingOrdering(left: Seq[(Int, Seq[Int])], right: Seq[(Int, Seq[Int])]): Seq[(Int, Seq[Int])] = {
if(left.last._1 == right.head._1) left.init :+ (left.last._1, left.last._2 ++ right.last._2)
else left ++ right
}
def countGroups(partitionId: Int, vectors: Iterator[(Long, LabeledPoint)]): Iterator[(Int, Int)] =
vectors
.toTraversable
.map { case (itemIdA, _) => Seq((itemIdA, 1)) }
.reduceLeft(groupReducerWithPreservingOrdering)
.map { case (itemIdA, cnt) => (partitionId, cnt)}
.toIterator
def countGroups2(partitionId: Int, vectors: Iterator[(Long, LabeledPoint)]): Iterator[(Int,Int)] =
Seq((partitionId, vectors
.size))
.toIterator
featuresSorted
.mapPartitionsWithIndex(countGroups2, true)
.collect()
.toSeq
.foreach(println)
val groups = featuresSorted
.mapPartitionsWithIndex(countGroups, true)
.collect()
.toSeq
.map{ case(partitionId: Int, cnt: Int) => Seq((partitionId, Seq(cnt))) }
.reduce(partitionReducerWithPreservingOrdering)
.map{ case(partition, lengths) => lengths}
(featuresSorted.map{ case(itemId, vector) => vector} , names, groups)
}
case class Feature(itemIdA: ItemId, itemIdB: ItemId, name: String, value: Double) {
override def toString = "%s\t%s\t%s\t%.12f".format(itemIdA, itemIdB, name, value)
}
case class Relevance(itemIdA: ItemId, itemIdB: ItemId, rateRaw: Float) {
def rate: Int = math.rint(rateRaw).toInt
}
val (featuresTrainLP, featureNames, partitionedGroups)= encodeFeaturesToLabeledPoint(featuresTrain, Option(relevance), args.workers())
val paramMap = List(
"eta" -> 0.023f,
"groupData" -> partitionedGroups,
"max_depth" -> 10,
"min_child_weight" -> 3.0,
"subsample" -> 1.0,
"colsample_bytree" -> 0.82,
"colsample_bylevel" -> 0.9,
"base_score" -> 0.005,
"eval_metric" -> "ndcg@5",
"seed" -> 49,
"silent" -> 1,
"objective" -> "rank:pairwise").toMap
val xgBoostModel = XGBoost.trainWithRDD(featuresTrainLP, paramMap, round = args.iterations(), nWorkers = args.workers())
@Ishitori
Copy link

Hi Roman, do you have a full working example that uses this Gist available? This gist is missing the implementation of valuesToFeatureArrayDouble function, and I wonder how does it look like?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment