Created
January 4, 2017 02:34
-
-
Save l15k4/75fc40ee76dec56dc4435818642bfe58 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package example | |
import java.io.File | |
import java.nio.charset.StandardCharsets | |
import java.nio.file.Path | |
import java.util.UUID | |
import akka.actor.ActorSystem | |
import akka.stream._ | |
import akka.stream.scaladsl.{FileIO, Flow, Framing, Keep, Sink, Source} | |
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler} | |
import akka.util.ByteString | |
import com.google.common.hash.Hashing | |
import org.apache.spark.util.sketch.BloomFilter | |
import scala.collection.mutable | |
import scala.concurrent.duration._ | |
import scala.concurrent.{Await, Future} | |
import scala.concurrent.ExecutionContext.Implicits.global | |
object HashSpike extends App { | |
implicit class ImpressionsCountPimp(underlying: mutable.Map[Long, Int]) { | |
def adjust(k: Long)(f: Option[Int] => Int): mutable.Map[Long, Int] = { | |
underlying.update(k, f(underlying.get(k))) | |
underlying | |
} | |
} | |
implicit val system = ActorSystem("QuickStart") | |
implicit val materializer = ActorMaterializer() | |
val totalSize = 100000000 | |
val uuidFile = new File("/tmp/test/uuid.csv") | |
val hashFile = new File("/tmp/test/hash.csv") | |
val duplicatesFile = new File("/tmp/test/duplicates.csv") | |
uuidFile.createNewFile() | |
hashFile.createNewFile() | |
duplicatesFile.createNewFile() | |
private def lineSink[T](file: Path): Sink[T, Future[IOResult]] = | |
Flow[T] | |
.map(s => ByteString(s.toString + "\n")) | |
.toMat(FileIO.toPath(file))(Keep.right) | |
val bfStage = | |
new BFilter[String](BloomFilter.create(totalSize, 0.0000001), { | |
(bf, uuid) => | |
val contains = bf.mightContainString(uuid) | |
if (!contains) bf.putString(uuid) | |
!contains | |
} | |
) | |
def uuidF = | |
Source.fromIterator(() => Iterator.range(0, totalSize)) | |
.map(_ => UUID.randomUUID().toString) | |
.via(bfStage) | |
.runWith(lineSink(uuidFile.toPath)) | |
def hashF = | |
FileIO.fromPath(uuidFile.toPath) | |
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024)) | |
.map(_.utf8String) | |
.map( line => Hashing.murmur3_32().hashString(line, StandardCharsets.UTF_8).asInt() ) | |
.runWith(lineSink(hashFile.toPath)) | |
def softDuplicatesF = | |
FileIO.fromPath(hashFile.toPath) | |
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024)) | |
.map(_.utf8String) | |
.runFold((mutable.HashSet.empty[Long], BloomFilter.create(totalSize, 0.0000001))) { | |
case ((acc, bf), line) if bf.mightContainLong(line.toLong) => | |
(acc += line.toLong, bf) | |
case ((acc, bf), line) => | |
bf.putLong(line.toLong) | |
acc -> bf | |
}.map(_._1.toSet) | |
def hardDuplicatesF(softDuplicates: Set[Long]) = | |
FileIO.fromPath(hashFile.toPath) | |
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024)) | |
.map(_.utf8String) | |
.runFold(mutable.Map.empty[Long,Int]) { | |
case (acc, line) if softDuplicates.contains(line.toLong) => | |
acc.adjust(line.toLong)(_.map(_ + 1).getOrElse(1)) | |
case (acc, _) => | |
acc | |
} | |
def resultF = | |
for { | |
_ <- uuidF | |
_ <- hashF | |
softDuplicates <- softDuplicatesF | |
hardDuplicates <- hardDuplicatesF(softDuplicates) | |
} yield hardDuplicates | |
val result = Await.result(resultF, 24.hours) | |
println("REAL DUPLICATES : " + result.count(_._2 > 1)) | |
system.terminate() | |
} | |
class BFilter[A](bf: BloomFilter, p: (BloomFilter, A) => Boolean) extends GraphStage[FlowShape[A, A]] { | |
val in = Inlet[A]("Filter.in") | |
val out = Outlet[A]("Filter.out") | |
val shape = FlowShape.of(in, out) | |
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = | |
new GraphStageLogic(shape) { | |
setHandler(in, new InHandler { | |
override def onPush(): Unit = { | |
val elem = grab(in) | |
if (p(bf, elem)) push(out, elem) | |
else pull(in) | |
} | |
}) | |
setHandler(out, new OutHandler { | |
override def onPull(): Unit = { | |
pull(in) | |
} | |
}) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment