Skip to content

Instantly share code, notes, and snippets.

@masterlittle
Created February 10, 2017 07:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save masterlittle/a2b993cd4cc4aa201cdc6347e1fc7651 to your computer and use it in GitHub Desktop.
Save masterlittle/a2b993cd4cc4aa201cdc6347e1fc7651 to your computer and use it in GitHub Desktop.
package recommendations.spark_apriori
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.mllib.fpm.AssociationRules
import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset
import org.apache.spark.rdd.RDD
import recommendations.spark_apriori.CAssociationRules.CRule
import scala.collection.mutable
import scala.reflect.ClassTag
/**
* Created by Shitij on 07/02/17.
*/
class CAssociationRules(totalTransactions: Long, private var minConf: Double = 0.00001) extends AssociationRules {
def runModel[Item: ClassTag](freqItemsets: RDD[FreqItemset[Item]],
itemMap: Broadcast[mutable.Map[Item, Long]]): RDD[CRule[Item]] = {
// For candidate rule X => Y, generate (X, (Y, freq(X union Y)), N)
val candidates = freqItemsets.flatMap { itemset =>
val items: Array[Item] = itemset.items
items.flatMap { item =>
items.partition(_ == item) match {
case (consequent, antecedent) if !antecedent.isEmpty =>
val item: Item = consequent(0)
val freqConsequent : Long = itemMap.value.getOrElse(item, 0L)
Some((antecedent.toSeq, (consequent.toSeq, itemset.freq, totalTransactions, freqConsequent)))
case _ => None
}
}
}
// Join to get (X, ((Y, freq(X union Y)), freq(X))), generate rules, and filter by confidence
candidates.join(freqItemsets.map(x => (x.items.toSeq, x.freq)))
.map { case (antecendent, ((consequent, freqUnion, totalTransact, freqConsequent), freqAntecedent)) =>
new CRule(antecendent.toArray, consequent.toArray, freqUnion, freqAntecedent, freqConsequent, totalTransact)
}.filter(_.confidence >= minConf)
}
}
object CAssociationRules {
class CRule[Item](val antecedent: Array[Item],
val consequent: Array[Item],
freqUnion: Double,
freqAntecedent: Double,
freqConsequent: Double,
totalTransactions: Long) extends Serializable {
def support: Double = freqUnion.toDouble / totalTransactions
/**
* Returns the confidence of the rule.
*
*/
def confidence: Double = freqUnion.toDouble / freqAntecedent
def lift: Double = {
if (freqConsequent != 0)
(freqUnion.toDouble * totalTransactions) / (freqAntecedent * freqConsequent)
else
0
}
require(antecedent.toSet.intersect(consequent.toSet).isEmpty, {
val sharedItems = antecedent.toSet.intersect(consequent.toSet)
s"A valid association rule must have disjoint antecedent and " +
s"consequent but $sharedItems is present in both."
})
override def toString: String = {
s"${antecedent.mkString("{", ",", "}")} => " +
s"${consequent.mkString("{", ",", "}")}: ${support} : ${confidence} : ${lift}"
}
}
}
package recommendations.spark_apriori
import java.util
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.mllib.fpm.FPGrowth
import org.apache.spark.{HashPartitioner, Partitioner, SparkContext, SparkException}
import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset
import org.apache.spark.mllib.util.Saveable
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import scala.collection.mutable
import scala.reflect.ClassTag
/**
* Created by Shitij on 07/02/17.
*/
class CFPGrowthModel[Item: ClassTag](val freqItemsets: RDD[FreqItemset[Item]])
extends Saveable with Serializable {
def generateAssociationRules(totalTransactions: Long,
confidence: Double = 0.001,
itemMap: Broadcast[mutable.Map[Item, Long]]
): RDD[CAssociationRules.CRule[Item]] = {
val associationRules = new CAssociationRules(totalTransactions, confidence)
associationRules.runModel(freqItemsets, itemMap)
}
override def save(sc: SparkContext, path: String): Unit = ???
override protected def formatVersion: String = ???
}
/**
* A parallel FP-growth algorithm to mine frequent itemsets. The algorithm is described in
* [[http://dx.doi.org/10.1145/1454008.1454027 Li et al., PFP: Parallel FP-Growth for Query
* Recommendation]]. PFP distributes computation in such a way that each worker executes an
* independent group of mining tasks. The FP-Growth algorithm is described in
* [[http://dx.doi.org/10.1145/335191.335372 Han et al., Mining frequent patterns without candidate
* generation]].
*
* @param minSupport the minimal support level of the frequent pattern, any pattern that appears
* more than (minSupport * size-of-the-dataset) times will be output
* @param numPartitions number of partitions used by parallel FP-growth
* @see [[http://en.wikipedia.org/wiki/Association_rule_learning Association rule learning
* (Wikipedia)]]
*
*/
class CFPGrowth private(private var minSupport: Double,
private var numPartitions: Int) extends FPGrowth {
/**
* Constructs a default instance with default parameters {minSupport: `0.3`, numPartitions: same
* as the input data}.
*
*/
def this() = this(0.3, -1)
/**
* Sets the minimal support level (default: `0.3`).
*
*/
override def setMinSupport(minSupport: Double): this.type = {
require(minSupport >= 0.0 && minSupport <= 1.0,
s"Minimal support level must be in range [0, 1] but got ${minSupport}")
this.minSupport = minSupport
this
}
/**
* Sets the number of partitions used by parallel FP-growth (default: same as input data).
*
*/
override def setNumPartitions(numPartitions: Int): this.type = {
require(numPartitions > 0,
s"Number of partitions must be positive but got ${numPartitions}")
this.numPartitions = numPartitions
this
}
/**
* Computes an FP-Growth model that contains frequent itemsets.
*
* @param data input data set, each element contains a transaction
* @return an [[org.apache.spark.mllib.fpm.FPGrowthModel]]
*
*/
def runModel[Item: ClassTag](data: RDD[Array[Item]]): (CFPGrowthModel[Item], mutable.Map[Item, Long]) = {
if (data.getStorageLevel == StorageLevel.NONE) {
logWarning("Input data is not cached.")
}
val count = data.count()
val minCount = math.ceil(minSupport * count).toLong
val numParts = if (numPartitions > 0) numPartitions else data.partitions.length
val partitioner = new HashPartitioner(numParts)
val freqItems = genFreqItems(data, minCount, partitioner)
val freqItemsets = genFreqItemsets(data, minCount, freqItems, partitioner)
val freqItemsMap: mutable.Map[Item, Long] = genItemMap(data)
(new CFPGrowthModel(freqItemsets), freqItemsMap)
}
private def genItemMap[Item: ClassTag](data: RDD[Array[Item]]): mutable.Map[Item, Long] = {
val freqItemsMap = data.flatMap(t => t).countByValue()
mutable.Map(freqItemsMap.toSeq : _*)
}
/**
* Generates frequent items by filtering the input data using minimal support level.
*
* @param minCount minimum count for frequent itemsets
* @param partitioner partitioner used to distribute items
* @return array of frequent pattern ordered by their frequencies
*/
private def genFreqItems[Item: ClassTag](data: RDD[Array[Item]],
minCount: Long,
partitioner: Partitioner): Array[Item] = {
data.flatMap { t =>
val uniq = t.toSet
if (t.length != uniq.size) {
throw new SparkException(s"Items in a transaction must be unique but got ${t.toSeq}.")
}
t
}.map(v => (v, 1L))
.reduceByKey(partitioner, _ + _)
.filter(_._2 >= minCount)
.collect()
.map(_._1)
}
/**
* Generate frequent itemsets by building FP-Trees, the extraction is done on each partition.
*
* @param data transactions
* @param minCount minimum count for frequent itemsets
* @param freqItems frequent items
* @param partitioner partitioner used to distribute transactions
* @return an RDD of (frequent itemset, count)
*/
private def genFreqItemsets[Item: ClassTag](data: RDD[Array[Item]],
minCount: Long,
freqItems: Array[Item],
partitioner: Partitioner): RDD[FreqItemset[Item]] = {
val itemToRank = freqItems.zipWithIndex.toMap
data.flatMap { transaction =>
genCondTransactions(transaction, itemToRank, partitioner)
}.aggregateByKey(new CFPTree[Int], partitioner.numPartitions)(
(tree, transaction) => tree.add(transaction, 1L), (tree1, tree2) => tree1.merge(tree2))
.flatMap { case (part, tree) =>
tree.extract(minCount, x => partitioner.getPartition(x) == part)
}.map { case (ranks, count) =>
new FreqItemset(ranks.map(i => freqItems(i)).toArray, count)
}
}
/**
* Generates conditional transactions.
*
* @param transaction a transaction
* @param itemToRank map from item to their rank
* @param partitioner partitioner used to distribute transactions
* @return a map of (target partition, conditional transaction)
*/
private def genCondTransactions[Item: ClassTag](transaction: Array[Item],
itemToRank: Map[Item, Int],
partitioner: Partitioner): mutable.Map[Int, Array[Int]] = {
val output = mutable.Map.empty[Int, Array[Int]]
// Filter the basket by frequent items pattern and sort their ranks.
val filtered = transaction.flatMap(itemToRank.get)
util.Arrays.sort(filtered)
val n = filtered.length
var i = n - 1
while (i >= 0) {
val item = filtered(i)
val part = partitioner.getPartition(item)
if (!output.contains(part)) {
output(part) = filtered.slice(0, i + 1)
}
i -= 1
}
output
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment