Skip to content

Instantly share code, notes, and snippets.

@l15k4
Created January 4, 2017 02:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save l15k4/75fc40ee76dec56dc4435818642bfe58 to your computer and use it in GitHub Desktop.
Save l15k4/75fc40ee76dec56dc4435818642bfe58 to your computer and use it in GitHub Desktop.
package example
import java.io.File
import java.nio.charset.StandardCharsets
import java.nio.file.Path
import java.util.UUID
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl.{FileIO, Flow, Framing, Keep, Sink, Source}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.util.ByteString
import com.google.common.hash.Hashing
import org.apache.spark.util.sketch.BloomFilter
import scala.collection.mutable
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
object HashSpike extends App {
implicit class ImpressionsCountPimp(underlying: mutable.Map[Long, Int]) {
def adjust(k: Long)(f: Option[Int] => Int): mutable.Map[Long, Int] = {
underlying.update(k, f(underlying.get(k)))
underlying
}
}
implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()
val totalSize = 100000000
val uuidFile = new File("/tmp/test/uuid.csv")
val hashFile = new File("/tmp/test/hash.csv")
val duplicatesFile = new File("/tmp/test/duplicates.csv")
uuidFile.createNewFile()
hashFile.createNewFile()
duplicatesFile.createNewFile()
private def lineSink[T](file: Path): Sink[T, Future[IOResult]] =
Flow[T]
.map(s => ByteString(s.toString + "\n"))
.toMat(FileIO.toPath(file))(Keep.right)
val bfStage =
new BFilter[String](BloomFilter.create(totalSize, 0.0000001), {
(bf, uuid) =>
val contains = bf.mightContainString(uuid)
if (!contains) bf.putString(uuid)
!contains
}
)
def uuidF =
Source.fromIterator(() => Iterator.range(0, totalSize))
.map(_ => UUID.randomUUID().toString)
.via(bfStage)
.runWith(lineSink(uuidFile.toPath))
def hashF =
FileIO.fromPath(uuidFile.toPath)
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024))
.map(_.utf8String)
.map( line => Hashing.murmur3_32().hashString(line, StandardCharsets.UTF_8).asInt() )
.runWith(lineSink(hashFile.toPath))
def softDuplicatesF =
FileIO.fromPath(hashFile.toPath)
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024))
.map(_.utf8String)
.runFold((mutable.HashSet.empty[Long], BloomFilter.create(totalSize, 0.0000001))) {
case ((acc, bf), line) if bf.mightContainLong(line.toLong) =>
(acc += line.toLong, bf)
case ((acc, bf), line) =>
bf.putLong(line.toLong)
acc -> bf
}.map(_._1.toSet)
def hardDuplicatesF(softDuplicates: Set[Long]) =
FileIO.fromPath(hashFile.toPath)
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024))
.map(_.utf8String)
.runFold(mutable.Map.empty[Long,Int]) {
case (acc, line) if softDuplicates.contains(line.toLong) =>
acc.adjust(line.toLong)(_.map(_ + 1).getOrElse(1))
case (acc, _) =>
acc
}
def resultF =
for {
_ <- uuidF
_ <- hashF
softDuplicates <- softDuplicatesF
hardDuplicates <- hardDuplicatesF(softDuplicates)
} yield hardDuplicates
val result = Await.result(resultF, 24.hours)
println("REAL DUPLICATES : " + result.count(_._2 > 1))
system.terminate()
}
class BFilter[A](bf: BloomFilter, p: (BloomFilter, A) => Boolean) extends GraphStage[FlowShape[A, A]] {
val in = Inlet[A]("Filter.in")
val out = Outlet[A]("Filter.out")
val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
if (p(bf, elem)) push(out, elem)
else pull(in)
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment