Skip to content

Instantly share code, notes, and snippets.

@anish749
Last active May 14, 2019 09:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anish749/426427f72c9c2fa00eaaa90ee03c8507 to your computer and use it in GitHub Desktop.
Save anish749/426427f72c9c2fa00eaaa90ee03c8507 to your computer and use it in GitHub Desktop.
Immutable BF Aggregator backed by private mutable BitSet.
// An Algebird immutable Bloom Filter Aggregator backed by a private mutable.BitSet.
final case class FBloomFilterAggregator[A](numHashes: Int, width: Int)(
implicit hash: Hash128[A]
) extends Aggregator[A, mutable.BitSet, BF[A]] {
private val hasher = BFHash(numHashes, width)(hash)
def prepare(value: A): mutable.BitSet = {
val hashes = hasher(value)
val b = new mutable.BitSet()
var idx = 0
while (idx < hashes.length) {
b.add(hashes(idx))
idx += 1
}
b
}
override def semigroup: Semigroup[mutable.BitSet] = new Semigroup[mutable.BitSet] {
override def plus(x: mutable.BitSet, y: mutable.BitSet): mutable.BitSet = x |= y
}
override def present(reduction: mutable.BitSet): BF[A] =
BFInstance(hasher, scala.collection.immutable.BitSet.fromBitMask(reduction.toBitMask), width)
}
package com.spotify.scio.util
import com.twitter.algebird.{Aggregator, Hash128, Semigroup}
import org.roaringbitmap.buffer._
final case class BFAggregator[A](numHashes: Int, width: Int)(
implicit hash: Hash128[A]
) extends Aggregator[A, MutableRoaringBitmap, ImmBloomFilter[A]] {
private val hasher: KirMit32Hash[A] = KirMit32Hash(numHashes, width)(hash)
def prepare(value: A): MutableRoaringBitmap =
MutableRoaringBitmap.bitmapOf(
hasher(value).sorted: _* // Sorted helps with faster insertions
)
override def semigroup: Semigroup[MutableRoaringBitmap] = new Semigroup[MutableRoaringBitmap] {
override def plus(x: MutableRoaringBitmap, y: MutableRoaringBitmap): MutableRoaringBitmap = {
x.or(y.toImmutableRoaringBitmap)
x
}
}
override def present(reduction: MutableRoaringBitmap): ImmBloomFilter[A] =
ImmBloomFilter(hasher, reduction.toImmutableRoaringBitmap)
}
/**
* FIXME Make this into an ADT and have a BFZero
* add methods to add elements and return a new bloom filter / other immutable functions.
*/
final case class ImmBloomFilter[A] private[util] (
hasher: KirMit32Hash[A],
bitmap: ImmutableRoaringBitmap
) {
def maybeContains(item: A): Boolean = {
val il = hasher(item)
var idx = 0
var found = true
while (idx < il.length && found) {
if (!bitmap.contains(il(idx))) {
found = false
}
idx += 1
}
found
}
}
// Benchmark using jmh
// jmh:run -f1 -wi 5 -i 10 .*RoaringBloomFilterAggregatorBenchmark.*
/*
package com.spotify.scio.jmh
import com.spotify.scio.util.{BFAggregator, BloomFilter, ImmBloomFilter}
import org.openjdk.jmh.annotations._
import scala.util.Random
/**
* Benchmarks for com.spotify.scio.util.BloomFilter
*
* Creating a BF from a collection
*/
object RoaringBloomFilterAggregatorBenchmark {
def createRandomString(nbrOfStrings: Int, lengthOfStrings: Int): Seq[String] =
Seq.fill(nbrOfStrings)(Random.nextString(lengthOfStrings))
@State(Scope.Benchmark)
class BloomFilterState {
@Param(Array("100", "1000", "10000"))
var nbrOfElements: Int = 0
@Param(Array("0.01", "0.001"))
var falsePositiveRate: Double = 0
var randomStrings: Seq[String] = _
@Setup(Level.Trial)
def setup(): Unit =
randomStrings = createRandomString(nbrOfElements, 10)
}
}
@State(Scope.Benchmark)
class RoaringBloomFilterAggregatorBenchmark {
import RoaringBloomFilterAggregatorBenchmark._
@Benchmark
def scioBFAggregator(bloomFilterState: BloomFilterState): ImmBloomFilter[String] = {
val scioutilbf =
BloomFilter[String](bloomFilterState.nbrOfElements, bloomFilterState.falsePositiveRate)
val bfAggregator = BFAggregator[String](scioutilbf.numHashes, scioutilbf.width)
val prepared = bloomFilterState.randomStrings.map(bfAggregator.prepare)
val aggregatedBf = bfAggregator.semigroup
.sumOption(prepared) // FIXME this would be pretty slow with multiple function calls.
.map(bfAggregator.present)
.get // TODO getOrElse Zero
// val sc = ScioContext()
// val sBf = sc.parallelize(bloomFilterState.randomStrings).aggregate(bfAggregator)
// sBf.materialize.underlying.value.toList.headOption.get
aggregatedBf
}
}
*/
@anish749
Copy link
Author

anish749 commented May 5, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment