Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Immutable BF Aggregator backed by private mutable BitSet.
// An Algebird immutable Bloom Filter Aggregator backed by a private mutable.BitSet.
final case class FBloomFilterAggregator[A](numHashes: Int, width: Int)(
implicit hash: Hash128[A]
) extends Aggregator[A, mutable.BitSet, BF[A]] {
private val hasher = BFHash(numHashes, width)(hash)
def prepare(value: A): mutable.BitSet = {
val hashes = hasher(value)
val b = new mutable.BitSet()
var idx = 0
while (idx < hashes.length) {
b.add(hashes(idx))
idx += 1
}
b
}
override def semigroup: Semigroup[mutable.BitSet] = new Semigroup[mutable.BitSet] {
override def plus(x: mutable.BitSet, y: mutable.BitSet): mutable.BitSet = x |= y
}
override def present(reduction: mutable.BitSet): BF[A] =
BFInstance(hasher, scala.collection.immutable.BitSet.fromBitMask(reduction.toBitMask), width)
}
package com.spotify.scio.util
import com.twitter.algebird.{Aggregator, Hash128, Semigroup}
import org.roaringbitmap.buffer._
final case class BFAggregator[A](numHashes: Int, width: Int)(
implicit hash: Hash128[A]
) extends Aggregator[A, MutableRoaringBitmap, ImmBloomFilter[A]] {
private val hasher: KirMit32Hash[A] = KirMit32Hash(numHashes, width)(hash)
def prepare(value: A): MutableRoaringBitmap =
MutableRoaringBitmap.bitmapOf(
hasher(value).sorted: _* // Sorted helps with faster insertions
)
override def semigroup: Semigroup[MutableRoaringBitmap] = new Semigroup[MutableRoaringBitmap] {
override def plus(x: MutableRoaringBitmap, y: MutableRoaringBitmap): MutableRoaringBitmap = {
x.or(y.toImmutableRoaringBitmap)
x
}
}
override def present(reduction: MutableRoaringBitmap): ImmBloomFilter[A] =
ImmBloomFilter(hasher, reduction.toImmutableRoaringBitmap)
}
/**
* FIXME Make this into an ADT and have a BFZero
* add methods to add elements and return a new bloom filter / other immutable functions.
*/
final case class ImmBloomFilter[A] private[util] (
hasher: KirMit32Hash[A],
bitmap: ImmutableRoaringBitmap
) {
def maybeContains(item: A): Boolean = {
val il = hasher(item)
var idx = 0
var found = true
while (idx < il.length && found) {
if (!bitmap.contains(il(idx))) {
found = false
}
idx += 1
}
found
}
}
// Benchmark using jmh
// jmh:run -f1 -wi 5 -i 10 .*RoaringBloomFilterAggregatorBenchmark.*
/*
package com.spotify.scio.jmh
import com.spotify.scio.util.{BFAggregator, BloomFilter, ImmBloomFilter}
import org.openjdk.jmh.annotations._
import scala.util.Random
/**
* Benchmarks for com.spotify.scio.util.BloomFilter
*
* Creating a BF from a collection
*/
object RoaringBloomFilterAggregatorBenchmark {
def createRandomString(nbrOfStrings: Int, lengthOfStrings: Int): Seq[String] =
Seq.fill(nbrOfStrings)(Random.nextString(lengthOfStrings))
@State(Scope.Benchmark)
class BloomFilterState {
@Param(Array("100", "1000", "10000"))
var nbrOfElements: Int = 0
@Param(Array("0.01", "0.001"))
var falsePositiveRate: Double = 0
var randomStrings: Seq[String] = _
@Setup(Level.Trial)
def setup(): Unit =
randomStrings = createRandomString(nbrOfElements, 10)
}
}
@State(Scope.Benchmark)
class RoaringBloomFilterAggregatorBenchmark {
import RoaringBloomFilterAggregatorBenchmark._
@Benchmark
def scioBFAggregator(bloomFilterState: BloomFilterState): ImmBloomFilter[String] = {
val scioutilbf =
BloomFilter[String](bloomFilterState.nbrOfElements, bloomFilterState.falsePositiveRate)
val bfAggregator = BFAggregator[String](scioutilbf.numHashes, scioutilbf.width)
val prepared = bloomFilterState.randomStrings.map(bfAggregator.prepare)
val aggregatedBf = bfAggregator.semigroup
.sumOption(prepared) // FIXME this would be pretty slow with multiple function calls.
.map(bfAggregator.present)
.get // TODO getOrElse Zero
// val sc = ScioContext()
// val sBf = sc.parallelize(bloomFilterState.randomStrings).aggregate(bfAggregator)
// sBf.materialize.underlying.value.toList.headOption.get
aggregatedBf
}
}
*/
@anish749

This comment has been minimized.

Copy link
Owner Author

anish749 commented May 5, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.