Skip to content

Instantly share code, notes, and snippets.

@tuan3w
Last active June 16, 2020 20:23
Show Gist options
  • Save tuan3w/c72cfe8d998e1763351fed1172755dcc to your computer and use it in GitHub Desktop.
Save tuan3w/c72cfe8d998e1763351fed1172755dcc to your computer and use it in GitHub Desktop.
Implementation of Biased Matrix Factorization on Spark
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.mllib.recommendation
import org.apache.spark.Logging
import org.apache.spark.annotation.{DeveloperApi, Since}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.ml.recommendation.{ALS2 => NewALS2}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
/**
* Alternating Least Squares matrix factorization.
*
* ALS attempts to estimate the ratings matrix `R` as the product of two lower-rank matrices,
* `X` and `Y`, i.e. `X * Yt = R`. Typically these approximations are called 'factor' matrices.
* The general approach is iterative. During each iteration, one of the factor matrices is held
* constant, while the other is solved for using least squares. The newly-solved factor matrix is
* then held constant while solving for the other factor matrix.
*
* This is a blocked implementation of the ALS factorization algorithm that groups the two sets
* of factors (referred to as "users" and "products") into blocks and reduces communication by only
* sending one copy of each user vector to each product block on each iteration, and only for the
* product blocks that need that user's feature vector. This is achieved by precomputing some
* information about the ratings matrix to determine the "out-links" of each user (which blocks of
* products it will contribute to) and "in-link" information for each product (which of the feature
* vectors it receives from each user block it will depend on). This allows us to send only an
* array of feature vectors between each user block and product block, and have the product block
* find the users' ratings and update the products based on these messages.
*
* For implicit preference data, the algorithm used is based on
* "Collaborative Filtering for Implicit Feedback Datasets", available at
* [[http://dx.doi.org/10.1109/ICDM.2008.22]], adapted for the blocked approach used here.
*
* Essentially instead of finding the low-rank approximations to the rating matrix `R`,
* this finds the approximations for a preference matrix `P` where the elements of `P` are 1 if
* r > 0 and 0 if r <= 0. The ratings then act as 'confidence' values related to strength of
* indicated user
* preferences rather than explicit ratings given to items.
*/
@Since("0.8.0")
class ALS2 private (
private var numUserBlocks: Int,
private var numProductBlocks: Int,
private var rank: Int,
private var iterations: Int,
private var lambda: Double,
private var implicitPrefs: Boolean,
private var alpha: Double,
private var seed: Long = System.nanoTime()
) extends Serializable with Logging {
/**
* Constructs an ALS instance with default parameters: {numBlocks: -1, rank: 10, iterations: 10,
* lambda: 0.01, implicitPrefs: false, alpha: 1.0}.
*/
@Since("0.8.0")
def this() = this(-1, -1, 10, 10, 0.01, false, 1.0)
/** If true, do alternating nonnegative least squares. */
private var nonnegative = false
/** storage level for user/product in/out links */
private var intermediateRDDStorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK
private var finalRDDStorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK
/** checkpoint interval */
private var checkpointInterval: Int = 10
/**
* Set the number of blocks for both user blocks and product blocks to parallelize the computation
* into; pass -1 for an auto-configured number of blocks. Default: -1.
*/
@Since("0.8.0")
def setBlocks(numBlocks: Int): this.type = {
this.numUserBlocks = numBlocks
this.numProductBlocks = numBlocks
this
}
/**
* Set the number of user blocks to parallelize the computation.
*/
@Since("1.1.0")
def setUserBlocks(numUserBlocks: Int): this.type = {
this.numUserBlocks = numUserBlocks
this
}
/**
* Set the number of product blocks to parallelize the computation.
*/
@Since("1.1.0")
def setProductBlocks(numProductBlocks: Int): this.type = {
this.numProductBlocks = numProductBlocks
this
}
/** Set the rank of the feature matrices computed (number of features). Default: 10. */
@Since("0.8.0")
def setRank(rank: Int): this.type = {
this.rank = rank
this
}
/** Set the number of iterations to run. Default: 10. */
@Since("0.8.0")
def setIterations(iterations: Int): this.type = {
this.iterations = iterations
this
}
/** Set the regularization parameter, lambda. Default: 0.01. */
@Since("0.8.0")
def setLambda(lambda: Double): this.type = {
this.lambda = lambda
this
}
/** Sets whether to use implicit preference. Default: false. */
@Since("0.8.1")
def setImplicitPrefs(implicitPrefs: Boolean): this.type = {
this.implicitPrefs = implicitPrefs
this
}
/**
* Sets the constant used in computing confidence in implicit ALS. Default: 1.0.
*/
@Since("0.8.1")
def setAlpha(alpha: Double): this.type = {
this.alpha = alpha
this
}
/** Sets a random seed to have deterministic results. */
@Since("1.0.0")
def setSeed(seed: Long): this.type = {
this.seed = seed
this
}
/**
* Set whether the least-squares problems solved at each iteration should have
* nonnegativity constraints.
*/
@Since("1.1.0")
def setNonnegative(b: Boolean): this.type = {
this.nonnegative = b
this
}
/**
* :: DeveloperApi ::
* Sets storage level for intermediate RDDs (user/product in/out links). The default value is
* `MEMORY_AND_DISK`. Users can change it to a serialized storage, e.g., `MEMORY_AND_DISK_SER` and
* set `spark.rdd.compress` to `true` to reduce the space requirement, at the cost of speed.
*/
@DeveloperApi
@Since("1.1.0")
def setIntermediateRDDStorageLevel(storageLevel: StorageLevel): this.type = {
require(storageLevel != StorageLevel.NONE,
"ALS is not designed to run without persisting intermediate RDDs.")
this.intermediateRDDStorageLevel = storageLevel
this
}
/**
* :: DeveloperApi ::
* Sets storage level for final RDDs (user/product used in MatrixFactorizationModel). The default
* value is `MEMORY_AND_DISK`. Users can change it to a serialized storage, e.g.
* `MEMORY_AND_DISK_SER` and set `spark.rdd.compress` to `true` to reduce the space requirement,
* at the cost of speed.
*/
@DeveloperApi
@Since("1.3.0")
def setFinalRDDStorageLevel(storageLevel: StorageLevel): this.type = {
this.finalRDDStorageLevel = storageLevel
this
}
/**
* Set period (in iterations) between checkpoints (default = 10). Checkpointing helps with
* recovery (when nodes fail) and StackOverflow exceptions caused by long lineage. It also helps
* with eliminating temporary shuffle files on disk, which can be important when there are many
* ALS iterations. If the checkpoint directory is not set in [[org.apache.spark.SparkContext]],
* this setting is ignored.
*/
@DeveloperApi
@Since("1.4.0")
def setCheckpointInterval(checkpointInterval: Int): this.type = {
this.checkpointInterval = checkpointInterval
this
}
/**
* Run ALS with the configured parameters on an input RDD of (user, product, rating) triples.
* Returns a MatrixFactorizationModel with feature vectors for each user and product.
*/
@Since("0.8.0")
def run(ratings: RDD[Rating]): MatrixFactorizationModel = {
val sc = ratings.context
val numUserBlocks = if (this.numUserBlocks == -1) {
math.max(sc.defaultParallelism, ratings.partitions.size / 2)
} else {
this.numUserBlocks
}
val numProductBlocks = if (this.numProductBlocks == -1) {
math.max(sc.defaultParallelism, ratings.partitions.size / 2)
} else {
this.numProductBlocks
}
val (floatUserFactors, floatProdFactors) = NewALS2.train[Int](
ratings = ratings.map(r => NewALS2.Rating(r.user, r.product, r.rating.toFloat)),
rank = rank,
numUserBlocks = numUserBlocks,
numItemBlocks = numProductBlocks,
maxIter = iterations,
regParam = lambda,
implicitPrefs = implicitPrefs,
alpha = alpha,
nonnegative = nonnegative,
intermediateRDDStorageLevel = intermediateRDDStorageLevel,
finalRDDStorageLevel = StorageLevel.NONE,
checkpointInterval = checkpointInterval,
seed = seed)
val userFactors = floatUserFactors
.mapValues(_.map(_.toDouble))
.setName("users")
.persist(finalRDDStorageLevel)
val prodFactors = floatProdFactors
.mapValues(_.map(_.toDouble))
.setName("products")
.persist(finalRDDStorageLevel)
if (finalRDDStorageLevel != StorageLevel.NONE) {
userFactors.count()
prodFactors.count()
}
new MatrixFactorizationModel(rank, userFactors, prodFactors)
}
/**
* Java-friendly version of [[ALS.run]].
*/
@Since("1.3.0")
def run(ratings: JavaRDD[Rating]): MatrixFactorizationModel = run(ratings.rdd)
}
/**
* Top-level methods for calling Alternating Least Squares (ALS) matrix factorization.
*/
@Since("0.8.0")
object ALS2 {
/**
* Train a matrix factorization model given an RDD of ratings given by users to some products,
* in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the
* product of two lower-rank matrices of a given rank (number of features). To solve for these
* features, we run a given number of iterations of ALS. This is done using a level of
* parallelism given by `blocks`.
*
* @param ratings RDD of (userID, productID, rating) pairs
* @param rank number of features to use
* @param iterations number of iterations of ALS (recommended: 10-20)
* @param lambda regularization factor (recommended: 0.01)
* @param blocks level of parallelism to split computation into
* @param seed random seed
*/
@Since("0.9.1")
def train(
ratings: RDD[Rating],
rank: Int,
iterations: Int,
lambda: Double,
blocks: Int,
seed: Long
): MatrixFactorizationModel = {
new ALS2(blocks, blocks, rank, iterations, lambda, false, 1.0, seed).run(ratings)
}
/**
* Train a matrix factorization model given an RDD of ratings given by users to some products,
* in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the
* product of two lower-rank matrices of a given rank (number of features). To solve for these
* features, we run a given number of iterations of ALS. This is done using a level of
* parallelism given by `blocks`.
*
* @param ratings RDD of (userID, productID, rating) pairs
* @param rank number of features to use
* @param iterations number of iterations of ALS (recommended: 10-20)
* @param lambda regularization factor (recommended: 0.01)
* @param blocks level of parallelism to split computation into
*/
@Since("0.8.0")
def train(
ratings: RDD[Rating],
rank: Int,
iterations: Int,
lambda: Double,
blocks: Int
): MatrixFactorizationModel = {
new ALS2(blocks, blocks, rank, iterations, lambda, false, 1.0).run(ratings)
}
/**
* Train a matrix factorization model given an RDD of ratings given by users to some products,
* in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the
* product of two lower-rank matrices of a given rank (number of features). To solve for these
* features, we run a given number of iterations of ALS. The level of parallelism is determined
* automatically based on the number of partitions in `ratings`.
*
* @param ratings RDD of (userID, productID, rating) pairs
* @param rank number of features to use
* @param iterations number of iterations of ALS (recommended: 10-20)
* @param lambda regularization factor (recommended: 0.01)
*/
@Since("0.8.0")
def train(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double)
: MatrixFactorizationModel = {
train(ratings, rank, iterations, lambda, -1)
}
/**
* Train a matrix factorization model given an RDD of ratings given by users to some products,
* in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the
* product of two lower-rank matrices of a given rank (number of features). To solve for these
* features, we run a given number of iterations of ALS. The level of parallelism is determined
* automatically based on the number of partitions in `ratings`.
*
* @param ratings RDD of (userID, productID, rating) pairs
* @param rank number of features to use
* @param iterations number of iterations of ALS (recommended: 10-20)
*/
@Since("0.8.0")
def train(ratings: RDD[Rating], rank: Int, iterations: Int)
: MatrixFactorizationModel = {
train(ratings, rank, iterations, 0.01, -1)
}
/**
* Train a matrix factorization model given an RDD of 'implicit preferences' given by users
* to some products, in the form of (userID, productID, preference) pairs. We approximate the
* ratings matrix as the product of two lower-rank matrices of a given rank (number of features).
* To solve for these features, we run a given number of iterations of ALS. This is done using
* a level of parallelism given by `blocks`.
*
* @param ratings RDD of (userID, productID, rating) pairs
* @param rank number of features to use
* @param iterations number of iterations of ALS (recommended: 10-20)
* @param lambda regularization factor (recommended: 0.01)
* @param blocks level of parallelism to split computation into
* @param alpha confidence parameter
* @param seed random seed
*/
@Since("0.8.1")
def trainImplicit(
ratings: RDD[Rating],
rank: Int,
iterations: Int,
lambda: Double,
blocks: Int,
alpha: Double,
seed: Long
): MatrixFactorizationModel = {
new ALS2(blocks, blocks, rank, iterations, lambda, true, alpha, seed).run(ratings)
}
/**
* Train a matrix factorization model given an RDD of 'implicit preferences' given by users
* to some products, in the form of (userID, productID, preference) pairs. We approximate the
* ratings matrix as the product of two lower-rank matrices of a given rank (number of features).
* To solve for these features, we run a given number of iterations of ALS. This is done using
* a level of parallelism given by `blocks`.
*
* @param ratings RDD of (userID, productID, rating) pairs
* @param rank number of features to use
* @param iterations number of iterations of ALS (recommended: 10-20)
* @param lambda regularization factor (recommended: 0.01)
* @param blocks level of parallelism to split computation into
* @param alpha confidence parameter
*/
@Since("0.8.1")
def trainImplicit(
ratings: RDD[Rating],
rank: Int,
iterations: Int,
lambda: Double,
blocks: Int,
alpha: Double
): MatrixFactorizationModel = {
new ALS2(blocks, blocks, rank, iterations, lambda, true, alpha).run(ratings)
}
/**
* Train a matrix factorization model given an RDD of 'implicit preferences' given by users to
* some products, in the form of (userID, productID, preference) pairs. We approximate the
* ratings matrix as the product of two lower-rank matrices of a given rank (number of features).
* To solve for these features, we run a given number of iterations of ALS. The level of
* parallelism is determined automatically based on the number of partitions in `ratings`.
*
* @param ratings RDD of (userID, productID, rating) pairs
* @param rank number of features to use
* @param iterations number of iterations of ALS (recommended: 10-20)
* @param lambda regularization factor (recommended: 0.01)
* @param alpha confidence parameter
*/
@Since("0.8.1")
def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double, alpha: Double)
: MatrixFactorizationModel = {
trainImplicit(ratings, rank, iterations, lambda, -1, alpha)
}
/**
* Train a matrix factorization model given an RDD of 'implicit preferences' ratings given by
* users to some products, in the form of (userID, productID, rating) pairs. We approximate the
* ratings matrix as the product of two lower-rank matrices of a given rank (number of features).
* To solve for these features, we run a given number of iterations of ALS. The level of
* parallelism is determined automatically based on the number of partitions in `ratings`.
* Model parameters `alpha` and `lambda` are set to reasonable default values
*
* @param ratings RDD of (userID, productID, rating) pairs
* @param rank number of features to use
* @param iterations number of iterations of ALS (recommended: 10-20)
*/
@Since("0.8.1")
def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int)
: MatrixFactorizationModel = {
trainImplicit(ratings, rank, iterations, 0.01, -1, 1.0)
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.ml.recommendation
import java.{ util => ju }
import java.io.IOException
import scala.collection.mutable
import scala.reflect.ClassTag
import scala.util.Sorting
import scala.util.hashing.byteswap64
import com.github.fommil.netlib.BLAS.{ getInstance => blas }
import org.apache.hadoop.fs.{ FileSystem, Path }
import org.json4s.DefaultFormats
import org.json4s.JsonDSL._
import org.apache.spark.{ Logging, Partitioner }
import org.apache.spark.annotation.{ Since, DeveloperApi, Experimental }
import org.apache.spark.ml.{ Estimator, Model }
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
import org.apache.spark.mllib.linalg.CholeskyDecomposition
import org.apache.spark.mllib.optimization.NNLS
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{ DoubleType, FloatType, IntegerType, StructType }
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
import org.apache.spark.util.collection.{ OpenHashMap, OpenHashSet, SortDataFormat, Sorter }
import org.apache.spark.util.random.XORShiftRandom
/**
* Common params for ALS and ALSModel2.
*/
private[recommendation] trait ALSModel2Params extends Params with HasPredictionCol {
/**
* Param for the column name for user ids.
* Default: "user"
* @group param
*/
val userCol = new Param[String](this, "userCol", "column name for user ids")
/** @group getParam */
def getUserCol: String = $(userCol)
/**
* Param for the column name for item ids.
* Default: "item"
* @group param
*/
val itemCol = new Param[String](this, "itemCol", "column name for item ids")
/** @group getParam */
def getItemCol: String = $(itemCol)
}
/**
* Common params for ALS.
*/
private[recommendation] trait ALSParams extends ALSModel2Params with HasMaxIter with HasRegParam
with HasPredictionCol with HasCheckpointInterval with HasSeed {
/**
* Param for rank of the matrix factorization (>= 1).
* Default: 10
* @group param
*/
val rank = new IntParam(this, "rank", "rank of the factorization", ParamValidators.gtEq(1))
/** @group getParam */
def getRank: Int = $(rank)
/**
* Param for number of user blocks (>= 1).
* Default: 10
* @group param
*/
val numUserBlocks = new IntParam(this, "numUserBlocks", "number of user blocks",
ParamValidators.gtEq(1))
/** @group getParam */
def getNumUserBlocks: Int = $(numUserBlocks)
/**
* Param for number of item blocks (>= 1).
* Default: 10
* @group param
*/
val numItemBlocks = new IntParam(this, "numItemBlocks", "number of item blocks",
ParamValidators.gtEq(1))
/** @group getParam */
def getNumItemBlocks: Int = $(numItemBlocks)
/**
* Param to decide whether to use implicit preference.
* Default: false
* @group param
*/
val implicitPrefs = new BooleanParam(this, "implicitPrefs", "whether to use implicit preference")
/** @group getParam */
def getImplicitPrefs: Boolean = $(implicitPrefs)
/**
* Param for the alpha parameter in the implicit preference formulation (>= 0).
* Default: 1.0
* @group param
*/
val alpha = new DoubleParam(this, "alpha", "alpha for implicit preference",
ParamValidators.gtEq(0))
/** @group getParam */
def getAlpha: Double = $(alpha)
/**
* Param for the column name for ratings.
* Default: "rating"
* @group param
*/
val ratingCol = new Param[String](this, "ratingCol", "column name for ratings")
/** @group getParam */
def getRatingCol: String = $(ratingCol)
/**
* Param for whether to apply nonnegativity constraints.
* Default: false
* @group param
*/
val nonnegative = new BooleanParam(
this, "nonnegative", "whether to use nonnegative constraint for least squares")
/** @group getParam */
def getNonnegative: Boolean = $(nonnegative)
setDefault(rank -> 10, maxIter -> 10, regParam -> 0.1, numUserBlocks -> 10, numItemBlocks -> 10,
implicitPrefs -> false, alpha -> 1.0, userCol -> "user", itemCol -> "item",
ratingCol -> "rating", nonnegative -> false, checkpointInterval -> 10)
/**
* Validates and transforms the input schema.
* @param schema input schema
* @return output schema
*/
protected def validateAndTransformSchema(schema: StructType): StructType = {
SchemaUtils.checkColumnType(schema, $(userCol), IntegerType)
SchemaUtils.checkColumnType(schema, $(itemCol), IntegerType)
val ratingType = schema($(ratingCol)).dataType
require(ratingType == FloatType || ratingType == DoubleType)
SchemaUtils.appendColumn(schema, $(predictionCol), FloatType)
}
}
/**
* :: Experimental ::
* Model fitted by ALS.
*
* @param rank rank of the matrix factorization model
* @param userFactors a DataFrame that stores user factors in two columns: `id` and `features`
* @param itemFactors a DataFrame that stores item factors in two columns: `id` and `features`
*/
@Experimental
class ALSModel2 private[ml] (
override val uid: String,
val rank: Int,
@transient val userFactors: DataFrame,
@transient val itemFactors: DataFrame)
extends Model[ALSModel2] with ALSModel2Params with MLWritable {
/** @group setParam */
def setUserCol(value: String): this.type = set(userCol, value)
/** @group setParam */
def setItemCol(value: String): this.type = set(itemCol, value)
/** @group setParam */
def setPredictionCol(value: String): this.type = set(predictionCol, value)
override def transform(dataset: DataFrame): DataFrame = {
// Register a UDF for DataFrame, and then
// create a new column named map(predictionCol) by running the predict UDF.
val predict = udf { (userFeatures: Seq[Float], itemFeatures: Seq[Float]) =>
if (userFeatures != null && itemFeatures != null) {
blas.sdot(rank, userFeatures.toArray, 1, itemFeatures.toArray, 1)
} else {
Float.NaN
}
}
dataset
.join(userFactors, dataset($(userCol)) === userFactors("id"), "left")
.join(itemFactors, dataset($(itemCol)) === itemFactors("id"), "left")
.select(dataset("*"),
predict(userFactors("features"), itemFactors("features")).as($(predictionCol)))
}
override def transformSchema(schema: StructType): StructType = {
SchemaUtils.checkColumnType(schema, $(userCol), IntegerType)
SchemaUtils.checkColumnType(schema, $(itemCol), IntegerType)
SchemaUtils.appendColumn(schema, $(predictionCol), FloatType)
}
override def copy(extra: ParamMap): ALSModel2 = {
val copied = new ALSModel2(uid, rank, userFactors, itemFactors)
copyValues(copied, extra).setParent(parent)
}
@Since("1.6.0")
override def write: MLWriter = new ALSModel2.ALSModel2Writer(this)
}
@Since("1.6.0")
object ALSModel2 extends MLReadable[ALSModel2] {
@Since("1.6.0")
override def read: MLReader[ALSModel2] = new ALSModel2Reader
@Since("1.6.0")
override def load(path: String): ALSModel2 = super.load(path)
private[ALSModel2] class ALSModel2Writer(instance: ALSModel2) extends MLWriter {
override protected def saveImpl(path: String): Unit = {
val extraMetadata = "rank" -> instance.rank
DefaultParamsWriter.saveMetadata(instance, path, sc, Some(extraMetadata))
val userPath = new Path(path, "userFactors").toString
instance.userFactors.write.format("parquet").save(userPath)
val itemPath = new Path(path, "itemFactors").toString
instance.itemFactors.write.format("parquet").save(itemPath)
}
}
private class ALSModel2Reader extends MLReader[ALSModel2] {
/** Checked against metadata when loading model */
private val className = classOf[ALSModel2].getName
override def load(path: String): ALSModel2 = {
val metadata = DefaultParamsReader.loadMetadata(path, sc, className)
implicit val format = DefaultFormats
val rank = (metadata.metadata \ "rank").extract[Int]
val userPath = new Path(path, "userFactors").toString
val userFactors = sqlContext.read.format("parquet").load(userPath)
val itemPath = new Path(path, "itemFactors").toString
val itemFactors = sqlContext.read.format("parquet").load(itemPath)
val model = new ALSModel2(metadata.uid, rank, userFactors, itemFactors)
DefaultParamsReader.getAndSetParams(model, metadata)
model
}
}
}
/**
* :: Experimental ::
* Alternating Least Squares (ALS) matrix factorization.
*
* ALS attempts to estimate the ratings matrix `R` as the product of two lower-rank matrices,
* `X` and `Y`, i.e. `X * Yt = R`. Typically these approximations are called 'factor' matrices.
* The general approach is iterative. During each iteration, one of the factor matrices is held
* constant, while the other is solved for using least squares. The newly-solved factor matrix is
* then held constant while solving for the other factor matrix.
*
* This is a blocked implementation of the ALS factorization algorithm that groups the two sets
* of factors (referred to as "users" and "products") into blocks and reduces communication by only
* sending one copy of each user vector to each product block on each iteration, and only for the
* product blocks that need that user's feature vector. This is achieved by pre-computing some
* information about the ratings matrix to determine the "out-links" of each user (which blocks of
* products it will contribute to) and "in-link" information for each product (which of the feature
* vectors it receives from each user block it will depend on). This allows us to send only an
* array of feature vectors between each user block and product block, and have the product block
* find the users' ratings and update the products based on these messages.
*
* For implicit preference data, the algorithm used is based on
* "Collaborative Filtering for Implicit Feedback Datasets", available at
* [[http://dx.doi.org/10.1109/ICDM.2008.22]], adapted for the blocked approach used here.
*
* Essentially instead of finding the low-rank approximations to the rating matrix `R`,
* this finds the approximations for a preference matrix `P` where the elements of `P` are 1 if
* r > 0 and 0 if r <= 0. The ratings then act as 'confidence' values related to strength of
* indicated user
* preferences rather than explicit ratings given to items.
*/
@Experimental
class ALS2(override val uid: String) extends Estimator[ALSModel2] with ALSParams
with DefaultParamsWritable {
import org.apache.spark.ml.recommendation.ALS2.Rating
def this() = this(Identifiable.randomUID("als"))
/** @group setParam */
def setRank(value: Int): this.type = set(rank, value)
/** @group setParam */
def setNumUserBlocks(value: Int): this.type = set(numUserBlocks, value)
/** @group setParam */
def setNumItemBlocks(value: Int): this.type = set(numItemBlocks, value)
/** @group setParam */
def setImplicitPrefs(value: Boolean): this.type = set(implicitPrefs, value)
/** @group setParam */
def setAlpha(value: Double): this.type = set(alpha, value)
/** @group setParam */
def setUserCol(value: String): this.type = set(userCol, value)
/** @group setParam */
def setItemCol(value: String): this.type = set(itemCol, value)
/** @group setParam */
def setRatingCol(value: String): this.type = set(ratingCol, value)
/** @group setParam */
def setPredictionCol(value: String): this.type = set(predictionCol, value)
/** @group setParam */
def setMaxIter(value: Int): this.type = set(maxIter, value)
/** @group setParam */
def setRegParam(value: Double): this.type = set(regParam, value)
/** @group setParam */
def setNonnegative(value: Boolean): this.type = set(nonnegative, value)
/** @group setParam */
def setCheckpointInterval(value: Int): this.type = set(checkpointInterval, value)
/** @group setParam */
def setSeed(value: Long): this.type = set(seed, value)
/**
* Sets both numUserBlocks and numItemBlocks to the specific value.
* @group setParam
*/
def setNumBlocks(value: Int): this.type = {
setNumUserBlocks(value)
setNumItemBlocks(value)
this
}
override def fit(dataset: DataFrame): ALSModel2 = {
import dataset.sqlContext.implicits._
val r = if ($(ratingCol) != "") col($(ratingCol)).cast(FloatType) else lit(1.0f)
val ratings = dataset
.select(col($(userCol)).cast(IntegerType), col($(itemCol)).cast(IntegerType), r)
.map { row =>
Rating(row.getInt(0), row.getInt(1), row.getFloat(2))
}
val (userFactors, itemFactors) = ALS2.train(ratings, rank = $(rank),
numUserBlocks = $(numUserBlocks), numItemBlocks = $(numItemBlocks),
maxIter = $(maxIter), regParam = $(regParam), implicitPrefs = $(implicitPrefs),
alpha = $(alpha), nonnegative = $(nonnegative),
checkpointInterval = $(checkpointInterval), seed = $(seed))
val userDF = userFactors.toDF("id", "features")
val itemDF = itemFactors.toDF("id", "features")
val model = new ALSModel2(uid, $(rank), userDF, itemDF).setParent(this)
copyValues(model)
}
override def transformSchema(schema: StructType): StructType = {
validateAndTransformSchema(schema)
}
override def copy(extra: ParamMap): ALS2 = defaultCopy(extra)
}
/**
* :: DeveloperApi ::
* An implementation of ALS that supports generic ID types, specialized for Int and Long. This is
* exposed as a developer API for users who do need other ID types. But it is not recommended
* because it increases the shuffle size and memory requirement during training. For simplicity,
* users and items must have the same type. The number of distinct users/items should be smaller
* than 2 billion.
*/
@DeveloperApi
object ALS2 extends DefaultParamsReadable[ALS2] with Logging {
/**
* :: DeveloperApi ::
* Rating class for better code readability.
*/
@DeveloperApi
case class Rating[@specialized(Int, Long) ID](user: ID, item: ID, rating: Float)
// @Since("1.6.0")
// override def load(path: String): ALS2 = super.load(path)
/** Trait for least squares solvers applied to the normal equation. */
private[recommendation] trait LeastSquaresNESolver extends Serializable {
/** Solves a least squares problem with regularization (possibly with other constraints). */
def solve(ne: NormalEquation, lambda: Double): Array[Float]
}
/** Cholesky solver for least square problems. */
private[recommendation] class CholeskySolver extends LeastSquaresNESolver {
/**
* Solves a least squares problem with L2 regularization:
*
* min norm(A x - b)^2^ + lambda * norm(x)^2^
*
* @param ne a [[NormalEquation]] instance that contains AtA, Atb, and n (number of instances)
* @param lambda regularization constant
* @return the solution x
*/
override def solve(ne: NormalEquation, lambda: Double): Array[Float] = {
val k = ne.k
// Add scaled lambda to the diagonals of AtA.
var i = 0
var j = 2
while (i < ne.triK) {
ne.ata(i) += lambda
i += j
j += 1
}
CholeskyDecomposition.solve(ne.ata, ne.atb)
val x = new Array[Float](k)
i = 0
while (i < k) {
x(i) = ne.atb(i).toFloat
i += 1
}
ne.reset()
x
}
}
/** NNLS solver. */
private[recommendation] class NNLSSolver extends LeastSquaresNESolver {
private var rank: Int = -1
private var workspace: NNLS.Workspace = _
private var ata: Array[Double] = _
private var initialized: Boolean = false
private def initialize(rank: Int): Unit = {
if (!initialized) {
this.rank = rank
workspace = NNLS.createWorkspace(rank)
ata = new Array[Double](rank * rank)
initialized = true
} else {
require(this.rank == rank)
}
}
/**
* Solves a nonnegative least squares problem with L2 regularizatin:
*
* min_x_ norm(A x - b)^2^ + lambda * n * norm(x)^2^
* subject to x >= 0
*/
override def solve(ne: NormalEquation, lambda: Double): Array[Float] = {
val rank = ne.k
initialize(rank)
fillAtA(ne.ata, lambda)
val x = NNLS.solve(ata, ne.atb, workspace)
ne.reset()
x.map(x => x.toFloat)
}
/**
* Given a triangular matrix in the order of fillXtX above, compute the full symmetric square
* matrix that it represents, storing it into destMatrix.
*/
private def fillAtA(triAtA: Array[Double], lambda: Double) {
var i = 0
var pos = 0
var a = 0.0
while (i < rank) {
var j = 0
while (j <= i) {
a = triAtA(pos)
ata(i * rank + j) = a
ata(j * rank + i) = a
pos += 1
j += 1
}
ata(i * rank + i) += lambda
i += 1
}
}
}
/**
* Representing a normal equation to solve the following weighted least squares problem:
*
* minimize \sum,,i,, c,,i,, (a,,i,,^T^ x - b,,i,,)^2^ + lambda * x^T^ x.
*
* Its normal equation is given by
*
* \sum,,i,, c,,i,, (a,,i,, a,,i,,^T^ x - b,,i,, a,,i,,) + lambda * x = 0.
*/
private[recommendation] class NormalEquation(val k: Int) extends Serializable {
/** Number of entries in the upper triangular part of a k-by-k matrix. */
val triK = k * (k + 1) / 2
/** A^T^ * A */
val ata = new Array[Double](triK)
/** A^T^ * b */
val atb = new Array[Double](k)
private val da = new Array[Double](k)
private val upper = "U"
private def copyToDouble(a: Array[Float]): Unit = {
var i = 0
while (i < k) {
da(i) = a(i)
i += 1
}
}
/** Adds an observation. */
def add(a: Array[Float], b: Double, c: Double = 1.0): this.type = {
require(c >= 0.0)
require(a.length == k, "Check: a = " + a.length + ", k = " + k)
copyToDouble(a)
blas.dspr(upper, k, c, da, 1, ata)
if (b != 0.0) {
blas.daxpy(k, c * b, da, 1, atb, 1)
}
this
}
/** Merges another normal equation object. */
def merge(other: NormalEquation): this.type = {
require(other.k == k)
blas.daxpy(ata.length, 1.0, other.ata, 1, ata, 1)
blas.daxpy(atb.length, 1.0, other.atb, 1, atb, 1)
this
}
/** Resets everything to zero, which should be called after each solve. */
def reset(): Unit = {
ju.Arrays.fill(ata, 0.0)
ju.Arrays.fill(atb, 0.0)
}
}
/**
* :: DeveloperApi ::
* Implementation of the ALS algorithm.
*/
@DeveloperApi
def train[ID: ClassTag]( // scalastyle:ignore
ratings: RDD[Rating[ID]],
rank: Int = 10,
numUserBlocks: Int = 10,
numItemBlocks: Int = 10,
maxIter: Int = 10,
regParam: Double = 1.0,
implicitPrefs: Boolean = false,
alpha: Double = 1.0,
nonnegative: Boolean = false,
intermediateRDDStorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK,
finalRDDStorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK,
checkpointInterval: Int = 10,
seed: Long = 0L)(
implicit ord: Ordering[ID]): (RDD[(ID, Array[Float])], RDD[(ID, Array[Float])]) = {
require(intermediateRDDStorageLevel != StorageLevel.NONE,
"ALS is not designed to run without persisting intermediate RDDs.")
val sc = ratings.sparkContext
val userPart = new ALSPartitioner(numUserBlocks)
val itemPart = new ALSPartitioner(numItemBlocks)
val userLocalIndexEncoder = new LocalIndexEncoder(userPart.numPartitions)
val itemLocalIndexEncoder = new LocalIndexEncoder(itemPart.numPartitions)
val solver = if (nonnegative) new NNLSSolver else new CholeskySolver
val blockRatings = partitionRatings(ratings, userPart, itemPart)
.persist(intermediateRDDStorageLevel)
val (userInBlocks, userOutBlocks) =
makeBlocks("user", blockRatings, userPart, itemPart, intermediateRDDStorageLevel)
// materialize blockRatings and user blocks
userOutBlocks.count()
val swappedBlockRatings = blockRatings.map {
case ((userBlockId, itemBlockId), RatingBlock(userIds, itemIds, localRatings)) =>
((itemBlockId, userBlockId), RatingBlock(itemIds, userIds, localRatings))
}
val (itemInBlocks, itemOutBlocks) =
makeBlocks("item", swappedBlockRatings, itemPart, userPart, intermediateRDDStorageLevel)
// materialize item blocks
itemOutBlocks.count()
val seedGen = new XORShiftRandom(seed)
//add bias
var userFactors = initialize(userInBlocks, rank + 1, seedGen.nextLong())
var itemFactors = initialize(itemInBlocks, rank + 1, seedGen.nextLong())
var previousCheckpointFile: Option[String] = None
val shouldCheckpoint: Int => Boolean = (iter) =>
sc.checkpointDir.isDefined && checkpointInterval != -1 && (iter % checkpointInterval == 0)
val deletePreviousCheckpointFile: () => Unit = () =>
previousCheckpointFile.foreach { file =>
try {
FileSystem.get(sc.hadoopConfiguration).delete(new Path(file), true)
} catch {
case e: IOException =>
logWarning(s"Cannot delete checkpoint file $file:", e)
}
}
if (implicitPrefs) {
for (iter <- 1 to maxIter) {
userFactors.setName(s"userFactors-$iter").persist(intermediateRDDStorageLevel)
val previousItemFactors = itemFactors
var biases = itemFactors.map {
case (blockId, factors) =>
val b = factors.map { x => x(0) }
(blockId, b)
}
itemFactors = computeFactors(biases, userFactors, userOutBlocks, itemInBlocks, rank, regParam,
userLocalIndexEncoder, implicitPrefs, alpha, solver)
previousItemFactors.unpersist()
itemFactors.setName(s"itemFactors-$iter").persist(intermediateRDDStorageLevel)
// TODO: Generalize PeriodicGraphCheckpointer and use it here.
if (shouldCheckpoint(iter)) {
itemFactors.checkpoint() // itemFactors gets materialized in computeFactors.
}
val previousUserFactors = userFactors
biases = userFactors.map {
case (blockId, factors) =>
val b = factors.map(x => x(0))
(blockId, b)
}
userFactors = computeFactors(biases, itemFactors, itemOutBlocks, userInBlocks, rank, regParam,
itemLocalIndexEncoder, implicitPrefs, alpha, solver)
if (shouldCheckpoint(iter)) {
deletePreviousCheckpointFile()
previousCheckpointFile = itemFactors.getCheckpointFile
}
previousUserFactors.unpersist()
}
} else {
for (iter <- 0 until maxIter) {
var biases = itemFactors.map {
case (blockId, factors) =>
val b = factors.map { x => x(0) }
(blockId, b)
}
itemFactors = computeFactors(biases, userFactors, userOutBlocks, itemInBlocks, rank, regParam,
userLocalIndexEncoder, solver = solver)
if (shouldCheckpoint(iter)) {
itemFactors.checkpoint()
itemFactors.count() // checkpoint item factors and cut lineage
deletePreviousCheckpointFile()
previousCheckpointFile = itemFactors.getCheckpointFile
}
biases = userFactors.map {
case (blockId, factors) =>
val b = factors.map { x => x(0) }
(blockId, b)
}
userFactors = computeFactors(biases, itemFactors, itemOutBlocks, userInBlocks, rank, regParam,
itemLocalIndexEncoder, solver = solver)
}
}
val userIdAndFactors = userInBlocks
.mapValues(_.srcIds)
.join(userFactors)
.mapPartitions({ items =>
items.flatMap {
case (_, (ids, factors)) =>
ids.view.zip(factors)
}
// Preserve the partitioning because IDs are consistent with the partitioners in userInBlocks
// and userFactors.
}, preservesPartitioning = true)
.map(v => (v._1, v._2.slice(1, v._2.length)))
.setName("userFactors")
.persist(finalRDDStorageLevel)
val itemIdAndFactorsWithBiases = itemInBlocks
.mapValues(_.srcIds)
.join(itemFactors)
.mapPartitions({ items =>
items.flatMap {
case (_, (ids, factors)) =>
ids.view.zip(factors)
}
}, preservesPartitioning = true)
.map(v => (v._1, (v._2(0), v._2.slice(1, v._2.length))))
.setName("itemFactors")
.persist(finalRDDStorageLevel)
val biasSamples1 = itemIdAndFactorsWithBiases.map(x => (x._1, x._2._1))
.filter(_._1.asInstanceOf[Int] > 0).take(50)
val biasSamples2 = itemIdAndFactorsWithBiases.map(x => (x._1, x._2._1))
.filter(_._1.asInstanceOf[Int] < 0).take(50)
println(biasSamples1.mkString("\n------------------------------------\n"))
println("=============================================================")
println(biasSamples2.mkString("\n------------------------------------\n"))
val itemIdAndFactors = itemIdAndFactorsWithBiases.map(x => (x._1, x._2._2))
if (finalRDDStorageLevel != StorageLevel.NONE) {
userIdAndFactors.count()
itemFactors.unpersist()
itemIdAndFactors.count()
userInBlocks.unpersist()
userOutBlocks.unpersist()
itemInBlocks.unpersist()
itemOutBlocks.unpersist()
blockRatings.unpersist()
}
(userIdAndFactors, itemIdAndFactors)
}
/**
* Factor block that stores factors (Array[Float]) in an Array.
*/
private type FactorBlock = Array[Array[Float]]
private type FactorBlock2 = Array[(Array[Float], Float)]
/**
* Out-link block that stores, for each dst (item/user) block, which src (user/item) factors to
* send. For example, outLinkBlock(0) contains the local indices (not the original src IDs) of the
* src factors in this block to send to dst block 0.
*/
private type OutBlock = Array[Array[Int]]
/**
* In-link block for computing src (user/item) factors. This includes the original src IDs
* of the elements within this block as well as encoded dst (item/user) indices and corresponding
* ratings. The dst indices are in the form of (blockId, localIndex), which are not the original
* dst IDs. To compute src factors, we expect receiving dst factors that match the dst indices.
* For example, if we have an in-link record
*
* {srcId: 0, dstBlockId: 2, dstLocalIndex: 3, rating: 5.0},
*
* and assume that the dst factors are stored as dstFactors: Map[Int, Array[Array[Float]]], which
* is a blockId to dst factors map, the corresponding dst factor of the record is dstFactor(2)(3).
*
* We use a CSC-like (compressed sparse column) format to store the in-link information. So we can
* compute src factors one after another using only one normal equation instance.
*
* @param srcIds src ids (ordered)
* @param dstPtrs dst pointers. Elements in range [dstPtrs(i), dstPtrs(i+1)) of dst indices and
* ratings are associated with srcIds(i).
* @param dstEncodedIndices encoded dst indices
* @param ratings ratings
*
* @see [[LocalIndexEncoder]]
*/
private[recommendation] case class InBlock[@specialized(Int, Long) ID: ClassTag](
srcIds: Array[ID],
dstPtrs: Array[Int],
dstEncodedIndices: Array[Int],
ratings: Array[Float]) {
/** Size of the block. */
def size: Int = ratings.length
require(dstEncodedIndices.length == size)
require(dstPtrs.length == srcIds.length + 1)
}
/**
* Initializes factors randomly given the in-link blocks.
*
* @param inBlocks in-link blocks
* @param rank rank
* @return initialized factor blocks
*/
private def initialize[ID](
inBlocks: RDD[(Int, InBlock[ID])],
rank: Int,
seed: Long): RDD[(Int, FactorBlock)] = {
// Choose a unit vector uniformly at random from the unit sphere, but from the
// "first quadrant" where all elements are nonnegative. This can be done by choosing
// elements distributed as Normal(0,1) and taking the absolute value, and then normalizing.
// This appears to create factorizations that have a slightly better reconstruction
// (<1%) compared picking elements uniformly at random in [0,1].
inBlocks.map {
case (srcBlockId, inBlock) =>
val random = new XORShiftRandom(byteswap64(seed ^ srcBlockId))
val factors = Array.fill(inBlock.srcIds.length) {
val factor = Array.fill(rank)(random.nextGaussian().toFloat)
val nrm = blas.snrm2(rank, factor, 1)
blas.sscal(rank, 1.0f / nrm, factor, 1)
factor(0) = 0.0f //bias = 0
factor
}
(srcBlockId, factors)
}
}
/**
* A rating block that contains src IDs, dst IDs, and ratings, stored in primitive arrays.
*/
private[recommendation] case class RatingBlock[@specialized(Int, Long) ID: ClassTag](
srcIds: Array[ID],
dstIds: Array[ID],
ratings: Array[Float]) {
/** Size of the block. */
def size: Int = srcIds.length
require(dstIds.length == srcIds.length)
require(ratings.length == srcIds.length)
}
/**
* Builder for [[RatingBlock]]. [[mutable.ArrayBuilder]] is used to avoid boxing/unboxing.
*/
private[recommendation] class RatingBlockBuilder[@specialized(Int, Long) ID: ClassTag]
extends Serializable {
private val srcIds = mutable.ArrayBuilder.make[ID]
private val dstIds = mutable.ArrayBuilder.make[ID]
private val ratings = mutable.ArrayBuilder.make[Float]
var size = 0
/** Adds a rating. */
def add(r: Rating[ID]): this.type = {
size += 1
srcIds += r.user
dstIds += r.item
ratings += r.rating
this
}
/** Merges another [[RatingBlockBuilder]]. */
def merge(other: RatingBlock[ID]): this.type = {
size += other.srcIds.length
srcIds ++= other.srcIds
dstIds ++= other.dstIds
ratings ++= other.ratings
this
}
/** Builds a [[RatingBlock]]. */
def build(): RatingBlock[ID] = {
RatingBlock[ID](srcIds.result(), dstIds.result(), ratings.result())
}
}
/**
* Partitions raw ratings into blocks.
*
* @param ratings raw ratings
* @param srcPart partitioner for src IDs
* @param dstPart partitioner for dst IDs
*
* @return an RDD of rating blocks in the form of ((srcBlockId, dstBlockId), ratingBlock)
*/
private def partitionRatings[ID: ClassTag](
ratings: RDD[Rating[ID]],
srcPart: Partitioner,
dstPart: Partitioner): RDD[((Int, Int), RatingBlock[ID])] = {
/* The implementation produces the same result as the following but generates less objects.
ratings.map { r =>
((srcPart.getPartition(r.user), dstPart.getPartition(r.item)), r)
}.aggregateByKey(new RatingBlockBuilder)(
seqOp = (b, r) => b.add(r),
combOp = (b0, b1) => b0.merge(b1.build()))
.mapValues(_.build())
*/
val numPartitions = srcPart.numPartitions * dstPart.numPartitions
ratings.mapPartitions { iter =>
val builders = Array.fill(numPartitions)(new RatingBlockBuilder[ID])
iter.flatMap { r =>
val srcBlockId = srcPart.getPartition(r.user)
val dstBlockId = dstPart.getPartition(r.item)
val idx = srcBlockId + srcPart.numPartitions * dstBlockId
val builder = builders(idx)
builder.add(r)
if (builder.size >= 2048) { // 2048 * (3 * 4) = 24k
builders(idx) = new RatingBlockBuilder
Iterator.single(((srcBlockId, dstBlockId), builder.build()))
} else {
Iterator.empty
}
} ++ {
builders.view.zipWithIndex.filter(_._1.size > 0).map {
case (block, idx) =>
val srcBlockId = idx % srcPart.numPartitions
val dstBlockId = idx / srcPart.numPartitions
((srcBlockId, dstBlockId), block.build())
}
}
}.groupByKey().mapValues { blocks =>
val builder = new RatingBlockBuilder[ID]
blocks.foreach(builder.merge)
builder.build()
}.setName("ratingBlocks")
}
/**
* Builder for uncompressed in-blocks of (srcId, dstEncodedIndex, rating) tuples.
* @param encoder encoder for dst indices
*/
private[recommendation] class UncompressedInBlockBuilder[@specialized(Int, Long) ID: ClassTag](
encoder: LocalIndexEncoder)(
implicit ord: Ordering[ID]) {
private val srcIds = mutable.ArrayBuilder.make[ID]
private val dstEncodedIndices = mutable.ArrayBuilder.make[Int]
private val ratings = mutable.ArrayBuilder.make[Float]
/**
* Adds a dst block of (srcId, dstLocalIndex, rating) tuples.
*
* @param dstBlockId dst block ID
* @param srcIds original src IDs
* @param dstLocalIndices dst local indices
* @param ratings ratings
*/
def add(
dstBlockId: Int,
srcIds: Array[ID],
dstLocalIndices: Array[Int],
ratings: Array[Float]): this.type = {
val sz = srcIds.length
require(dstLocalIndices.length == sz)
require(ratings.length == sz)
this.srcIds ++= srcIds
this.ratings ++= ratings
var j = 0
while (j < sz) {
this.dstEncodedIndices += encoder.encode(dstBlockId, dstLocalIndices(j))
j += 1
}
this
}
/** Builds a [[UncompressedInBlock]]. */
def build(): UncompressedInBlock[ID] = {
new UncompressedInBlock(srcIds.result(), dstEncodedIndices.result(), ratings.result())
}
}
/**
* A block of (srcId, dstEncodedIndex, rating) tuples stored in primitive arrays.
*/
private[recommendation] class UncompressedInBlock[@specialized(Int, Long) ID: ClassTag](
val srcIds: Array[ID],
val dstEncodedIndices: Array[Int],
val ratings: Array[Float])(
implicit ord: Ordering[ID]) {
/** Size the of block. */
def length: Int = srcIds.length
/**
* Compresses the block into an [[InBlock]]. The algorithm is the same as converting a
* sparse matrix from coordinate list (COO) format into compressed sparse column (CSC) format.
* Sorting is done using Spark's built-in Timsort to avoid generating too many objects.
*/
def compress(): InBlock[ID] = {
val sz = length
assert(sz > 0, "Empty in-link block should not exist.")
sort()
val uniqueSrcIdsBuilder = mutable.ArrayBuilder.make[ID]
val dstCountsBuilder = mutable.ArrayBuilder.make[Int]
var preSrcId = srcIds(0)
uniqueSrcIdsBuilder += preSrcId
var curCount = 1
var i = 1
var j = 0
while (i < sz) {
val srcId = srcIds(i)
if (srcId != preSrcId) {
uniqueSrcIdsBuilder += srcId
dstCountsBuilder += curCount
preSrcId = srcId
j += 1
curCount = 0
}
curCount += 1
i += 1
}
dstCountsBuilder += curCount
val uniqueSrcIds = uniqueSrcIdsBuilder.result()
val numUniqueSrdIds = uniqueSrcIds.length
val dstCounts = dstCountsBuilder.result()
val dstPtrs = new Array[Int](numUniqueSrdIds + 1)
var sum = 0
i = 0
while (i < numUniqueSrdIds) {
sum += dstCounts(i)
i += 1
dstPtrs(i) = sum
}
InBlock(uniqueSrcIds, dstPtrs, dstEncodedIndices, ratings)
}
private def sort(): Unit = {
val sz = length
// Since there might be interleaved log messages, we insert a unique id for easy pairing.
val sortId = Utils.random.nextInt()
logDebug(s"Start sorting an uncompressed in-block of size $sz. (sortId = $sortId)")
val start = System.nanoTime()
val sorter = new Sorter(new UncompressedInBlockSort[ID])
sorter.sort(this, 0, length, Ordering[KeyWrapper[ID]])
val duration = (System.nanoTime() - start) / 1e9
logDebug(s"Sorting took $duration seconds. (sortId = $sortId)")
}
}
/**
* A wrapper that holds a primitive key.
*
* @see [[UncompressedInBlockSort]]
*/
private class KeyWrapper[@specialized(Int, Long) ID: ClassTag](
implicit ord: Ordering[ID]) extends Ordered[KeyWrapper[ID]] {
var key: ID = _
override def compare(that: KeyWrapper[ID]): Int = {
ord.compare(key, that.key)
}
def setKey(key: ID): this.type = {
this.key = key
this
}
}
/**
* [[SortDataFormat]] of [[UncompressedInBlock]] used by [[Sorter]].
*/
private class UncompressedInBlockSort[@specialized(Int, Long) ID: ClassTag](
implicit ord: Ordering[ID])
extends SortDataFormat[KeyWrapper[ID], UncompressedInBlock[ID]] {
override def newKey(): KeyWrapper[ID] = new KeyWrapper()
override def getKey(
data: UncompressedInBlock[ID],
pos: Int,
reuse: KeyWrapper[ID]): KeyWrapper[ID] = {
if (reuse == null) {
new KeyWrapper().setKey(data.srcIds(pos))
} else {
reuse.setKey(data.srcIds(pos))
}
}
override def getKey(
data: UncompressedInBlock[ID],
pos: Int): KeyWrapper[ID] = {
getKey(data, pos, null)
}
private def swapElements[@specialized(Int, Float) T](
data: Array[T],
pos0: Int,
pos1: Int): Unit = {
val tmp = data(pos0)
data(pos0) = data(pos1)
data(pos1) = tmp
}
override def swap(data: UncompressedInBlock[ID], pos0: Int, pos1: Int): Unit = {
swapElements(data.srcIds, pos0, pos1)
swapElements(data.dstEncodedIndices, pos0, pos1)
swapElements(data.ratings, pos0, pos1)
}
override def copyRange(
src: UncompressedInBlock[ID],
srcPos: Int,
dst: UncompressedInBlock[ID],
dstPos: Int,
length: Int): Unit = {
System.arraycopy(src.srcIds, srcPos, dst.srcIds, dstPos, length)
System.arraycopy(src.dstEncodedIndices, srcPos, dst.dstEncodedIndices, dstPos, length)
System.arraycopy(src.ratings, srcPos, dst.ratings, dstPos, length)
}
override def allocate(length: Int): UncompressedInBlock[ID] = {
new UncompressedInBlock(
new Array[ID](length), new Array[Int](length), new Array[Float](length))
}
override def copyElement(
src: UncompressedInBlock[ID],
srcPos: Int,
dst: UncompressedInBlock[ID],
dstPos: Int): Unit = {
dst.srcIds(dstPos) = src.srcIds(srcPos)
dst.dstEncodedIndices(dstPos) = src.dstEncodedIndices(srcPos)
dst.ratings(dstPos) = src.ratings(srcPos)
}
}
/**
* Creates in-blocks and out-blocks from rating blocks.
* @param prefix prefix for in/out-block names
* @param ratingBlocks rating blocks
* @param srcPart partitioner for src IDs
* @param dstPart partitioner for dst IDs
* @return (in-blocks, out-blocks)
*/
private def makeBlocks[ID: ClassTag](
prefix: String,
ratingBlocks: RDD[((Int, Int), RatingBlock[ID])],
srcPart: Partitioner,
dstPart: Partitioner,
storageLevel: StorageLevel)(
implicit srcOrd: Ordering[ID]): (RDD[(Int, InBlock[ID])], RDD[(Int, OutBlock)]) = {
val inBlocks = ratingBlocks.map {
case ((srcBlockId, dstBlockId), RatingBlock(srcIds, dstIds, ratings)) =>
// The implementation is a faster version of
// val dstIdToLocalIndex = dstIds.toSet.toSeq.sorted.zipWithIndex.toMap
val start = System.nanoTime()
val dstIdSet = new OpenHashSet[ID](1 << 20)
dstIds.foreach(dstIdSet.add)
val sortedDstIds = new Array[ID](dstIdSet.size)
var i = 0
var pos = dstIdSet.nextPos(0)
while (pos != -1) {
sortedDstIds(i) = dstIdSet.getValue(pos)
pos = dstIdSet.nextPos(pos + 1)
i += 1
}
assert(i == dstIdSet.size)
Sorting.quickSort(sortedDstIds)
val dstIdToLocalIndex = new OpenHashMap[ID, Int](sortedDstIds.length)
i = 0
while (i < sortedDstIds.length) {
dstIdToLocalIndex.update(sortedDstIds(i), i)
i += 1
}
logDebug(
"Converting to local indices took " + (System.nanoTime() - start) / 1e9 + " seconds.")
val dstLocalIndices = dstIds.map(dstIdToLocalIndex.apply)
(srcBlockId, (dstBlockId, srcIds, dstLocalIndices, ratings))
}.groupByKey(new ALSPartitioner(srcPart.numPartitions))
.mapValues { iter =>
val builder =
new UncompressedInBlockBuilder[ID](new LocalIndexEncoder(dstPart.numPartitions))
iter.foreach {
case (dstBlockId, srcIds, dstLocalIndices, ratings) =>
builder.add(dstBlockId, srcIds, dstLocalIndices, ratings)
}
builder.build().compress()
}.setName(prefix + "InBlocks")
.persist(storageLevel)
val outBlocks = inBlocks.mapValues {
case InBlock(srcIds, dstPtrs, dstEncodedIndices, _) =>
val encoder = new LocalIndexEncoder(dstPart.numPartitions)
val activeIds = Array.fill(dstPart.numPartitions)(mutable.ArrayBuilder.make[Int])
var i = 0
val seen = new Array[Boolean](dstPart.numPartitions)
while (i < srcIds.length) {
var j = dstPtrs(i)
ju.Arrays.fill(seen, false)
while (j < dstPtrs(i + 1)) {
val dstBlockId = encoder.blockId(dstEncodedIndices(j))
if (!seen(dstBlockId)) {
activeIds(dstBlockId) += i // add the local index in this out-block
seen(dstBlockId) = true
}
j += 1
}
i += 1
}
activeIds.map { x =>
x.result()
}
}.setName(prefix + "OutBlocks")
.persist(storageLevel)
(inBlocks, outBlocks)
}
/**
* Compute dst factors by constructing and solving least square problems.
*
* @param srcFactorBlocks src factors
* @param srcOutBlocks src out-blocks
* @param dstInBlocks dst in-blocks
* @param rank rank
* @param regParam regularization constant
* @param srcEncoder encoder for src local indices
* @param implicitPrefs whether to use implicit preference
* @param alpha the alpha constant in the implicit preference formulation
* @param solver solver for least squares problems
*
* @return dst factors
*/
private def computeFactors[ID](
biases: RDD[(Int, Array[Float])],
srcFactorBlocks: RDD[(Int, FactorBlock)],
srcOutBlocks: RDD[(Int, OutBlock)],
dstInBlocks: RDD[(Int, InBlock[ID])],
rank: Int,
regParam: Double,
srcEncoder: LocalIndexEncoder,
implicitPrefs: Boolean = false,
alpha: Double = 1.0,
solver: LeastSquaresNESolver): RDD[(Int, FactorBlock)] = {
val numSrcBlocks = srcFactorBlocks.partitions.length
// newSrcFactors.persist(StorageLevel.MEMORY_AND_DISK_SER)
// val YtY = if (implicitPrefs) Some(computeYtY(srcFactorBlocks, rank)) else None
val tmp = srcFactorBlocks.map(x => {
val f = x._2.map(v => {
val t = v.clone()
t(0) = 1.0f
t
})
(x._1, f)
})
tmp.cache()
val YtY = if (implicitPrefs) Some(computeYtY(tmp, rank + 1)) else None
tmp.unpersist()
val srcOut = srcOutBlocks.join(srcFactorBlocks)
.flatMap {
case (srcBlockId, (srcOutBlock, srcFactors)) =>
srcOutBlock.view.zipWithIndex.map {
case (activeIndices, dstBlockId) =>
(dstBlockId, (srcBlockId, activeIndices.map(idx => srcFactors(idx))))
}
// .map(x => ((x._1, x._2._1), x._2._2))
}
// .map(x => (x._1._1, (x._1._2, x._2._1, x._2._2)))
val merged = srcOut.groupByKey(new ALSPartitioner(dstInBlocks.partitions.length))
val out = dstInBlocks
.join(merged)
.join(biases)
.mapValues(x => (x._1._1, x._1._2, x._2))
.mapValues {
//x: (Int, (Iterable[ALS2.InBlock[ID]], Iterable[Iterable[(Int, Array[Array[Float]])]], Iterable[Array[Float]]))
//((ALS2.InBlock[ID], Iterable[(Int, Array[Array[Float]])]), Array[Float]))
case (InBlock(dstIds, srcPtrs, srcEncodedIndices, ratings), srcFactors, biases) =>
val sortedSrcFactors = new Array[FactorBlock](numSrcBlocks)
srcFactors.foreach {
case (srcBlockId, factors) =>
sortedSrcFactors(srcBlockId) = factors
}
val dstFactors = new Array[Array[Float]](dstIds.length)
var j = 0
val ls = new NormalEquation(rank + 1)
while (j < dstIds.length) {
ls.reset()
if (implicitPrefs) {
ls.merge(YtY.get)
}
var i = srcPtrs(j)
var numExplicits = 0
while (i < srcPtrs(j + 1)) {
val encoded = srcEncodedIndices(i)
val blockId = srcEncoder.blockId(encoded)
val localIndex = srcEncoder.localIndex(encoded)
val srcFactor = sortedSrcFactors(blockId)(localIndex)
srcFactor(0) = 1.0f
val bias = biases(j)
val rating = ratings(i) - bias
if (implicitPrefs) {
// Extension to the original paper to handle b < 0. confidence is a function of |b|
// instead so that it is never negative. c1 is confidence - 1.0.
val c1 = alpha * math.abs(rating)
// For rating <= 0, the corresponding preference is 0. So the term below is only added
// for rating > 0. Because YtY is already added, we need to adjust the scaling here.
if (rating > 0) {
numExplicits += 1
ls.add(srcFactor, (c1 + 1.0) / c1, c1)
}
} else {
ls.add(srcFactor, rating)
numExplicits += 1
}
i += 1
}
// Weight lambda by the number of explicit ratings based on the ALS-WR paper.
dstFactors(j) = solver.solve(ls, numExplicits * regParam)
j += 1
}
dstFactors
}
//restore factors
out
}
/**
* Computes the Gramian matrix of user or item factors, which is only used in implicit preference.
* Caching of the input factors is handled in [[ALS#train]].
*/
private def computeYtY(factorBlocks: RDD[(Int, FactorBlock)], rank: Int): NormalEquation = {
factorBlocks.values.aggregate(new NormalEquation(rank))(
seqOp = (ne, factors) => {
factors.foreach(ne.add(_, 0.0))
ne
},
combOp = (ne1, ne2) => ne1.merge(ne2))
}
/**
* Encoder for storing (blockId, localIndex) into a single integer.
*
* We use the leading bits (including the sign bit) to store the block id and the rest to store
* the local index. This is based on the assumption that users/items are approximately evenly
* partitioned. With this assumption, we should be able to encode two billion distinct values.
*
* @param numBlocks number of blocks
*/
private[recommendation] class LocalIndexEncoder(numBlocks: Int) extends Serializable {
require(numBlocks > 0, s"numBlocks must be positive but found $numBlocks.")
private[this] final val numLocalIndexBits =
math.min(java.lang.Integer.numberOfLeadingZeros(numBlocks - 1), 31)
private[this] final val localIndexMask = (1 << numLocalIndexBits) - 1
/** Encodes a (blockId, localIndex) into a single integer. */
def encode(blockId: Int, localIndex: Int): Int = {
require(blockId < numBlocks)
require((localIndex & ~localIndexMask) == 0)
(blockId << numLocalIndexBits) | localIndex
}
/** Gets the block id from an encoded index. */
@inline
def blockId(encoded: Int): Int = {
encoded >>> numLocalIndexBits
}
/** Gets the local index from an encoded index. */
@inline
def localIndex(encoded: Int): Int = {
encoded & localIndexMask
}
}
/**
* Partitioner used by ALS. We requires that getPartition is a projection. That is, for any key k,
* we have getPartition(getPartition(k)) = getPartition(k). Since the the default HashPartitioner
* satisfies this requirement, we simply use a type alias here.
*/
private[recommendation]type ALSPartitioner = org.apache.spark.HashPartitioner
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment