Created
May 7, 2018 17:16
-
-
Save DavidRdgz/8ff17ee8928ea30b8f54aa825a4e992b to your computer and use it in GitHub Desktop.
Spark unique counts using HyperLogLog Algebird object with tests
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.dvidr.counts | |
import com.twitter.algebird.{HLL, HyperLogLogMonoid} | |
import org.apache.spark.rdd.RDD | |
case class EmailSchema(sender: String, | |
to: String, | |
cc: String, | |
bcc: String, | |
sentDate: String, | |
receivedDate: String) | |
object MapRawData extends Serializable { | |
@transient lazy val log = org.apache.log4j.LogManager.getLogger("myLogger") | |
// todo for easy time groupings: split into yyyy, MM, dd fields | |
// val format = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss") | |
def mapRawLine(line: String): Option[EmailSchema] = { | |
try { | |
val fields = line.split(",") | |
Some( | |
EmailSchema( | |
sender = fields(0), | |
to = fields(1), | |
cc = fields(2), | |
bcc = fields(3), | |
sentDate = fields(4), | |
receivedDate = fields(5) | |
) | |
) | |
} | |
catch { | |
case e: Exception => | |
log.warn("[WARN] Skipping line: malformated") | |
None | |
} | |
} | |
} | |
case class FromUniqueTo(from: String, uniqueTos: HLL) | |
case class FromCount(from: String, count: Double) | |
class HllTransforms extends Serializable { | |
val hll = new HyperLogLogMonoid(4) | |
def hllMapper(events: RDD[EmailSchema]): RDD[FromUniqueTo] = { | |
events.map({ email => | |
val uTos: HLL = hll.create(email.to.getBytes()) | |
FromUniqueTo(email.sender, uTos) | |
}) | |
} | |
def hllReducer(events: RDD[FromUniqueTo]): RDD[FromCount] = { | |
events.groupBy(fut => fut.from) | |
.map({ case (from, utoos) => | |
val hlls = utoos.map(_.uniqueTos) | |
val us = hlls.reduce(_ + _) | |
FromCount(from, us.estimatedSize) | |
}) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.dvidr.counts | |
import org.apache.log4j.LogManager | |
import org.apache.spark.{SparkConf, SparkContext} | |
object MyDriver extends HllTransforms with Serializable { | |
def main(args: Array[String]): Unit = { | |
// todo: modify to parse args for path | |
val path = "src/main/resources/coh_email_metadata_1Q17.csv" | |
val conf = new SparkConf() | |
.setAppName("My Driver") | |
.setMaster("local[8]") | |
.set("spark.executor.memory", "12g") | |
.set("spark.driver.memory", "12g") | |
.set("spark.driver.maxResultSize", "4g") | |
val log = LogManager.getRootLogger | |
val sc = new SparkContext(conf) | |
val rawData = sc.textFile(path) | |
val numberOfRawLines = rawData.count() | |
println("[INFO] Number of raw lines:" + numberOfRawLines) | |
val mapRawData = MapRawData | |
val emails = rawData.flatMap(line => mapRawData.mapRawLine(line)) | |
val numberOfParsedLines = emails.count() | |
println("[INFO] Number of parsed lines:" + numberOfParsedLines) | |
val hllMap = hllMapper(emails) | |
val hllRed = hllReducer(hllMap) | |
hllRed.take(5).foreach({ x => | |
println(x.from) | |
println(x.count) | |
}) | |
val numberOfFroms = hllRed.count() | |
println("[INFO] Number of senders:" + numberOfFroms) | |
sc.stop() | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.dvidr.counts | |
import com.twitter.algebird.HLL | |
import org.apache.spark.rdd.RDD | |
import org.apache.spark.{SparkConf, SparkContext} | |
import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers} | |
import scala.reflect.ClassTag | |
class UniqueCountWithHllTest extends FlatSpec with Matchers with BeforeAndAfter { | |
var sc: SparkContext = _ | |
before { | |
val conf = new SparkConf() | |
.setAppName("UniqueCountHllTest") | |
.setMaster("local[*]") | |
sc = new SparkContext(conf) | |
} | |
after { | |
sc.stop() | |
} | |
trait LineParser extends Serializable { | |
val mapRawData = MapRawData | |
} | |
trait DataToRdd extends Serializable { | |
def stringArrayToRdd[T <: String : ClassTag](array: Array[T]): RDD[T] = { | |
sc.parallelize(array) | |
} | |
} | |
trait DataToHll extends HllTransforms with Serializable { | |
def makeHll(strings: String*): HLL = { | |
strings.map({ s => hll.create(s.getBytes()) }).reduce(_ + _) | |
} | |
} | |
behavior of "Unique Counts with Hll" | |
it should "parse input email data correctly" in new LineParser { | |
val email = mapRawData.mapRawLine("a,b,c,d,e,f").get | |
assert(email === EmailSchema("a", "b", "c", "d", "e", "f")) | |
} | |
it should "return none if email data is malformed" in new LineParser { | |
val email = mapRawData.mapRawLine("a,b,d,g,e") | |
assert(email.isEmpty) | |
} | |
it should "skip malformed input email data correctly" in new LineParser with DataToRdd { | |
val rawData = stringArrayToRdd( | |
Array("a,b,c,d,e,f", "", "b,c,d,e,f,g")) | |
val outData = | |
Array(EmailSchema("a", "b", "c", "d", "e", "f"), | |
EmailSchema("b", "c", "d", "e", "f", "g")) | |
val emails = rawData | |
.flatMap(line => mapRawData.mapRawLine(line)) | |
.collect() | |
emails should contain allElementsOf outData | |
} | |
it should "map emails to from hll key value pairs" in new HllTransforms{ | |
val inData = sc.parallelize( | |
Array( | |
EmailSchema("a", "b", "c", "d", "e", "f"), | |
EmailSchema("a", "c", "d", "e", "f", "g") | |
) | |
) | |
val futData = hllMapper(inData) | |
val senders = futData | |
.map(_.from) | |
.collect() | |
senders should contain allElementsOf Array("a") | |
val counts = futData | |
.map(_.uniqueTos.estimatedSize) | |
.collect() | |
counts should contain allElementsOf Array(1.0326163382011386) | |
} | |
it should "reduce fromuniqueto to from, count pairs" in new DataToHll{ | |
val twoHll = makeHll("b", "c") | |
val threeHll = makeHll("b", "d", "e") | |
val inData = sc.parallelize( | |
Array( | |
FromUniqueTo("a", twoHll), | |
FromUniqueTo("a", threeHll) | |
) | |
) | |
val from = hllReducer(inData).map(_.from).collect() | |
from should contain allElementsOf Array("a") | |
val count = hllReducer(inData).map(_.count).collect() | |
count should contain allElementsOf Array(4.6029131592284935) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment