Skip to content

Instantly share code, notes, and snippets.

@DavidRdgz
Created May 7, 2018 17:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save DavidRdgz/8ff17ee8928ea30b8f54aa825a4e992b to your computer and use it in GitHub Desktop.
Save DavidRdgz/8ff17ee8928ea30b8f54aa825a4e992b to your computer and use it in GitHub Desktop.
Spark unique counts using HyperLogLog Algebird object with tests
package com.dvidr.counts
import com.twitter.algebird.{HLL, HyperLogLogMonoid}
import org.apache.spark.rdd.RDD
case class EmailSchema(sender: String,
to: String,
cc: String,
bcc: String,
sentDate: String,
receivedDate: String)
object MapRawData extends Serializable {
@transient lazy val log = org.apache.log4j.LogManager.getLogger("myLogger")
// todo for easy time groupings: split into yyyy, MM, dd fields
// val format = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
def mapRawLine(line: String): Option[EmailSchema] = {
try {
val fields = line.split(",")
Some(
EmailSchema(
sender = fields(0),
to = fields(1),
cc = fields(2),
bcc = fields(3),
sentDate = fields(4),
receivedDate = fields(5)
)
)
}
catch {
case e: Exception =>
log.warn("[WARN] Skipping line: malformated")
None
}
}
}
case class FromUniqueTo(from: String, uniqueTos: HLL)
case class FromCount(from: String, count: Double)
class HllTransforms extends Serializable {
val hll = new HyperLogLogMonoid(4)
def hllMapper(events: RDD[EmailSchema]): RDD[FromUniqueTo] = {
events.map({ email =>
val uTos: HLL = hll.create(email.to.getBytes())
FromUniqueTo(email.sender, uTos)
})
}
def hllReducer(events: RDD[FromUniqueTo]): RDD[FromCount] = {
events.groupBy(fut => fut.from)
.map({ case (from, utoos) =>
val hlls = utoos.map(_.uniqueTos)
val us = hlls.reduce(_ + _)
FromCount(from, us.estimatedSize)
})
}
}
package com.dvidr.counts
import org.apache.log4j.LogManager
import org.apache.spark.{SparkConf, SparkContext}
object MyDriver extends HllTransforms with Serializable {
def main(args: Array[String]): Unit = {
// todo: modify to parse args for path
val path = "src/main/resources/coh_email_metadata_1Q17.csv"
val conf = new SparkConf()
.setAppName("My Driver")
.setMaster("local[8]")
.set("spark.executor.memory", "12g")
.set("spark.driver.memory", "12g")
.set("spark.driver.maxResultSize", "4g")
val log = LogManager.getRootLogger
val sc = new SparkContext(conf)
val rawData = sc.textFile(path)
val numberOfRawLines = rawData.count()
println("[INFO] Number of raw lines:" + numberOfRawLines)
val mapRawData = MapRawData
val emails = rawData.flatMap(line => mapRawData.mapRawLine(line))
val numberOfParsedLines = emails.count()
println("[INFO] Number of parsed lines:" + numberOfParsedLines)
val hllMap = hllMapper(emails)
val hllRed = hllReducer(hllMap)
hllRed.take(5).foreach({ x =>
println(x.from)
println(x.count)
})
val numberOfFroms = hllRed.count()
println("[INFO] Number of senders:" + numberOfFroms)
sc.stop()
}
}
package com.dvidr.counts
import com.twitter.algebird.HLL
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
import scala.reflect.ClassTag
class UniqueCountWithHllTest extends FlatSpec with Matchers with BeforeAndAfter {
var sc: SparkContext = _
before {
val conf = new SparkConf()
.setAppName("UniqueCountHllTest")
.setMaster("local[*]")
sc = new SparkContext(conf)
}
after {
sc.stop()
}
trait LineParser extends Serializable {
val mapRawData = MapRawData
}
trait DataToRdd extends Serializable {
def stringArrayToRdd[T <: String : ClassTag](array: Array[T]): RDD[T] = {
sc.parallelize(array)
}
}
trait DataToHll extends HllTransforms with Serializable {
def makeHll(strings: String*): HLL = {
strings.map({ s => hll.create(s.getBytes()) }).reduce(_ + _)
}
}
behavior of "Unique Counts with Hll"
it should "parse input email data correctly" in new LineParser {
val email = mapRawData.mapRawLine("a,b,c,d,e,f").get
assert(email === EmailSchema("a", "b", "c", "d", "e", "f"))
}
it should "return none if email data is malformed" in new LineParser {
val email = mapRawData.mapRawLine("a,b,d,g,e")
assert(email.isEmpty)
}
it should "skip malformed input email data correctly" in new LineParser with DataToRdd {
val rawData = stringArrayToRdd(
Array("a,b,c,d,e,f", "", "b,c,d,e,f,g"))
val outData =
Array(EmailSchema("a", "b", "c", "d", "e", "f"),
EmailSchema("b", "c", "d", "e", "f", "g"))
val emails = rawData
.flatMap(line => mapRawData.mapRawLine(line))
.collect()
emails should contain allElementsOf outData
}
it should "map emails to from hll key value pairs" in new HllTransforms{
val inData = sc.parallelize(
Array(
EmailSchema("a", "b", "c", "d", "e", "f"),
EmailSchema("a", "c", "d", "e", "f", "g")
)
)
val futData = hllMapper(inData)
val senders = futData
.map(_.from)
.collect()
senders should contain allElementsOf Array("a")
val counts = futData
.map(_.uniqueTos.estimatedSize)
.collect()
counts should contain allElementsOf Array(1.0326163382011386)
}
it should "reduce fromuniqueto to from, count pairs" in new DataToHll{
val twoHll = makeHll("b", "c")
val threeHll = makeHll("b", "d", "e")
val inData = sc.parallelize(
Array(
FromUniqueTo("a", twoHll),
FromUniqueTo("a", threeHll)
)
)
val from = hllReducer(inData).map(_.from).collect()
from should contain allElementsOf Array("a")
val count = hllReducer(inData).map(_.count).collect()
count should contain allElementsOf Array(4.6029131592284935)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment