-
-
Save masterlittle/a2b993cd4cc4aa201cdc6347e1fc7651 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package recommendations.spark_apriori | |
import org.apache.spark.broadcast.Broadcast | |
import org.apache.spark.mllib.fpm.AssociationRules | |
import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset | |
import org.apache.spark.rdd.RDD | |
import recommendations.spark_apriori.CAssociationRules.CRule | |
import scala.collection.mutable | |
import scala.reflect.ClassTag | |
/** | |
* Created by Shitij on 07/02/17. | |
*/ | |
class CAssociationRules(totalTransactions: Long, private var minConf: Double = 0.00001) extends AssociationRules { | |
def runModel[Item: ClassTag](freqItemsets: RDD[FreqItemset[Item]], | |
itemMap: Broadcast[mutable.Map[Item, Long]]): RDD[CRule[Item]] = { | |
// For candidate rule X => Y, generate (X, (Y, freq(X union Y)), N) | |
val candidates = freqItemsets.flatMap { itemset => | |
val items: Array[Item] = itemset.items | |
items.flatMap { item => | |
items.partition(_ == item) match { | |
case (consequent, antecedent) if !antecedent.isEmpty => | |
val item: Item = consequent(0) | |
val freqConsequent : Long = itemMap.value.getOrElse(item, 0L) | |
Some((antecedent.toSeq, (consequent.toSeq, itemset.freq, totalTransactions, freqConsequent))) | |
case _ => None | |
} | |
} | |
} | |
// Join to get (X, ((Y, freq(X union Y)), freq(X))), generate rules, and filter by confidence | |
candidates.join(freqItemsets.map(x => (x.items.toSeq, x.freq))) | |
.map { case (antecendent, ((consequent, freqUnion, totalTransact, freqConsequent), freqAntecedent)) => | |
new CRule(antecendent.toArray, consequent.toArray, freqUnion, freqAntecedent, freqConsequent, totalTransact) | |
}.filter(_.confidence >= minConf) | |
} | |
} | |
object CAssociationRules { | |
class CRule[Item](val antecedent: Array[Item], | |
val consequent: Array[Item], | |
freqUnion: Double, | |
freqAntecedent: Double, | |
freqConsequent: Double, | |
totalTransactions: Long) extends Serializable { | |
def support: Double = freqUnion.toDouble / totalTransactions | |
/** | |
* Returns the confidence of the rule. | |
* | |
*/ | |
def confidence: Double = freqUnion.toDouble / freqAntecedent | |
def lift: Double = { | |
if (freqConsequent != 0) | |
(freqUnion.toDouble * totalTransactions) / (freqAntecedent * freqConsequent) | |
else | |
0 | |
} | |
require(antecedent.toSet.intersect(consequent.toSet).isEmpty, { | |
val sharedItems = antecedent.toSet.intersect(consequent.toSet) | |
s"A valid association rule must have disjoint antecedent and " + | |
s"consequent but $sharedItems is present in both." | |
}) | |
override def toString: String = { | |
s"${antecedent.mkString("{", ",", "}")} => " + | |
s"${consequent.mkString("{", ",", "}")}: ${support} : ${confidence} : ${lift}" | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package recommendations.spark_apriori | |
import java.util | |
import org.apache.spark.broadcast.Broadcast | |
import org.apache.spark.mllib.fpm.FPGrowth | |
import org.apache.spark.{HashPartitioner, Partitioner, SparkContext, SparkException} | |
import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset | |
import org.apache.spark.mllib.util.Saveable | |
import org.apache.spark.rdd.RDD | |
import org.apache.spark.storage.StorageLevel | |
import scala.collection.mutable | |
import scala.reflect.ClassTag | |
/** | |
* Created by Shitij on 07/02/17. | |
*/ | |
class CFPGrowthModel[Item: ClassTag](val freqItemsets: RDD[FreqItemset[Item]]) | |
extends Saveable with Serializable { | |
def generateAssociationRules(totalTransactions: Long, | |
confidence: Double = 0.001, | |
itemMap: Broadcast[mutable.Map[Item, Long]] | |
): RDD[CAssociationRules.CRule[Item]] = { | |
val associationRules = new CAssociationRules(totalTransactions, confidence) | |
associationRules.runModel(freqItemsets, itemMap) | |
} | |
override def save(sc: SparkContext, path: String): Unit = ??? | |
override protected def formatVersion: String = ??? | |
} | |
/** | |
* A parallel FP-growth algorithm to mine frequent itemsets. The algorithm is described in | |
* [[http://dx.doi.org/10.1145/1454008.1454027 Li et al., PFP: Parallel FP-Growth for Query | |
* Recommendation]]. PFP distributes computation in such a way that each worker executes an | |
* independent group of mining tasks. The FP-Growth algorithm is described in | |
* [[http://dx.doi.org/10.1145/335191.335372 Han et al., Mining frequent patterns without candidate | |
* generation]]. | |
* | |
* @param minSupport the minimal support level of the frequent pattern, any pattern that appears | |
* more than (minSupport * size-of-the-dataset) times will be output | |
* @param numPartitions number of partitions used by parallel FP-growth | |
* @see [[http://en.wikipedia.org/wiki/Association_rule_learning Association rule learning | |
* (Wikipedia)]] | |
* | |
*/ | |
class CFPGrowth private(private var minSupport: Double, | |
private var numPartitions: Int) extends FPGrowth { | |
/** | |
* Constructs a default instance with default parameters {minSupport: `0.3`, numPartitions: same | |
* as the input data}. | |
* | |
*/ | |
def this() = this(0.3, -1) | |
/** | |
* Sets the minimal support level (default: `0.3`). | |
* | |
*/ | |
override def setMinSupport(minSupport: Double): this.type = { | |
require(minSupport >= 0.0 && minSupport <= 1.0, | |
s"Minimal support level must be in range [0, 1] but got ${minSupport}") | |
this.minSupport = minSupport | |
this | |
} | |
/** | |
* Sets the number of partitions used by parallel FP-growth (default: same as input data). | |
* | |
*/ | |
override def setNumPartitions(numPartitions: Int): this.type = { | |
require(numPartitions > 0, | |
s"Number of partitions must be positive but got ${numPartitions}") | |
this.numPartitions = numPartitions | |
this | |
} | |
/** | |
* Computes an FP-Growth model that contains frequent itemsets. | |
* | |
* @param data input data set, each element contains a transaction | |
* @return an [[org.apache.spark.mllib.fpm.FPGrowthModel]] | |
* | |
*/ | |
def runModel[Item: ClassTag](data: RDD[Array[Item]]): (CFPGrowthModel[Item], mutable.Map[Item, Long]) = { | |
if (data.getStorageLevel == StorageLevel.NONE) { | |
logWarning("Input data is not cached.") | |
} | |
val count = data.count() | |
val minCount = math.ceil(minSupport * count).toLong | |
val numParts = if (numPartitions > 0) numPartitions else data.partitions.length | |
val partitioner = new HashPartitioner(numParts) | |
val freqItems = genFreqItems(data, minCount, partitioner) | |
val freqItemsets = genFreqItemsets(data, minCount, freqItems, partitioner) | |
val freqItemsMap: mutable.Map[Item, Long] = genItemMap(data) | |
(new CFPGrowthModel(freqItemsets), freqItemsMap) | |
} | |
private def genItemMap[Item: ClassTag](data: RDD[Array[Item]]): mutable.Map[Item, Long] = { | |
val freqItemsMap = data.flatMap(t => t).countByValue() | |
mutable.Map(freqItemsMap.toSeq : _*) | |
} | |
/** | |
* Generates frequent items by filtering the input data using minimal support level. | |
* | |
* @param minCount minimum count for frequent itemsets | |
* @param partitioner partitioner used to distribute items | |
* @return array of frequent pattern ordered by their frequencies | |
*/ | |
private def genFreqItems[Item: ClassTag](data: RDD[Array[Item]], | |
minCount: Long, | |
partitioner: Partitioner): Array[Item] = { | |
data.flatMap { t => | |
val uniq = t.toSet | |
if (t.length != uniq.size) { | |
throw new SparkException(s"Items in a transaction must be unique but got ${t.toSeq}.") | |
} | |
t | |
}.map(v => (v, 1L)) | |
.reduceByKey(partitioner, _ + _) | |
.filter(_._2 >= minCount) | |
.collect() | |
.map(_._1) | |
} | |
/** | |
* Generate frequent itemsets by building FP-Trees, the extraction is done on each partition. | |
* | |
* @param data transactions | |
* @param minCount minimum count for frequent itemsets | |
* @param freqItems frequent items | |
* @param partitioner partitioner used to distribute transactions | |
* @return an RDD of (frequent itemset, count) | |
*/ | |
private def genFreqItemsets[Item: ClassTag](data: RDD[Array[Item]], | |
minCount: Long, | |
freqItems: Array[Item], | |
partitioner: Partitioner): RDD[FreqItemset[Item]] = { | |
val itemToRank = freqItems.zipWithIndex.toMap | |
data.flatMap { transaction => | |
genCondTransactions(transaction, itemToRank, partitioner) | |
}.aggregateByKey(new CFPTree[Int], partitioner.numPartitions)( | |
(tree, transaction) => tree.add(transaction, 1L), (tree1, tree2) => tree1.merge(tree2)) | |
.flatMap { case (part, tree) => | |
tree.extract(minCount, x => partitioner.getPartition(x) == part) | |
}.map { case (ranks, count) => | |
new FreqItemset(ranks.map(i => freqItems(i)).toArray, count) | |
} | |
} | |
/** | |
* Generates conditional transactions. | |
* | |
* @param transaction a transaction | |
* @param itemToRank map from item to their rank | |
* @param partitioner partitioner used to distribute transactions | |
* @return a map of (target partition, conditional transaction) | |
*/ | |
private def genCondTransactions[Item: ClassTag](transaction: Array[Item], | |
itemToRank: Map[Item, Int], | |
partitioner: Partitioner): mutable.Map[Int, Array[Int]] = { | |
val output = mutable.Map.empty[Int, Array[Int]] | |
// Filter the basket by frequent items pattern and sort their ranks. | |
val filtered = transaction.flatMap(itemToRank.get) | |
util.Arrays.sort(filtered) | |
val n = filtered.length | |
var i = n - 1 | |
while (i >= 0) { | |
val item = filtered(i) | |
val part = partitioner.getPartition(item) | |
if (!output.contains(part)) { | |
output(part) = filtered.slice(0, i + 1) | |
} | |
i -= 1 | |
} | |
output | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment