Last active
May 14, 2019 09:48
-
-
Save anish749/426427f72c9c2fa00eaaa90ee03c8507 to your computer and use it in GitHub Desktop.
Immutable BF Aggregator backed by private mutable BitSet.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// An Algebird immutable Bloom Filter Aggregator backed by a private mutable.BitSet. | |
final case class FBloomFilterAggregator[A](numHashes: Int, width: Int)( | |
implicit hash: Hash128[A] | |
) extends Aggregator[A, mutable.BitSet, BF[A]] { | |
private val hasher = BFHash(numHashes, width)(hash) | |
def prepare(value: A): mutable.BitSet = { | |
val hashes = hasher(value) | |
val b = new mutable.BitSet() | |
var idx = 0 | |
while (idx < hashes.length) { | |
b.add(hashes(idx)) | |
idx += 1 | |
} | |
b | |
} | |
override def semigroup: Semigroup[mutable.BitSet] = new Semigroup[mutable.BitSet] { | |
override def plus(x: mutable.BitSet, y: mutable.BitSet): mutable.BitSet = x |= y | |
} | |
override def present(reduction: mutable.BitSet): BF[A] = | |
BFInstance(hasher, scala.collection.immutable.BitSet.fromBitMask(reduction.toBitMask), width) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.spotify.scio.util | |
import com.twitter.algebird.{Aggregator, Hash128, Semigroup} | |
import org.roaringbitmap.buffer._ | |
final case class BFAggregator[A](numHashes: Int, width: Int)( | |
implicit hash: Hash128[A] | |
) extends Aggregator[A, MutableRoaringBitmap, ImmBloomFilter[A]] { | |
private val hasher: KirMit32Hash[A] = KirMit32Hash(numHashes, width)(hash) | |
def prepare(value: A): MutableRoaringBitmap = | |
MutableRoaringBitmap.bitmapOf( | |
hasher(value).sorted: _* // Sorted helps with faster insertions | |
) | |
override def semigroup: Semigroup[MutableRoaringBitmap] = new Semigroup[MutableRoaringBitmap] { | |
override def plus(x: MutableRoaringBitmap, y: MutableRoaringBitmap): MutableRoaringBitmap = { | |
x.or(y.toImmutableRoaringBitmap) | |
x | |
} | |
} | |
override def present(reduction: MutableRoaringBitmap): ImmBloomFilter[A] = | |
ImmBloomFilter(hasher, reduction.toImmutableRoaringBitmap) | |
} | |
/** | |
* FIXME Make this into an ADT and have a BFZero | |
* add methods to add elements and return a new bloom filter / other immutable functions. | |
*/ | |
final case class ImmBloomFilter[A] private[util] ( | |
hasher: KirMit32Hash[A], | |
bitmap: ImmutableRoaringBitmap | |
) { | |
def maybeContains(item: A): Boolean = { | |
val il = hasher(item) | |
var idx = 0 | |
var found = true | |
while (idx < il.length && found) { | |
if (!bitmap.contains(il(idx))) { | |
found = false | |
} | |
idx += 1 | |
} | |
found | |
} | |
} | |
// Benchmark using jmh | |
// jmh:run -f1 -wi 5 -i 10 .*RoaringBloomFilterAggregatorBenchmark.* | |
/* | |
package com.spotify.scio.jmh | |
import com.spotify.scio.util.{BFAggregator, BloomFilter, ImmBloomFilter} | |
import org.openjdk.jmh.annotations._ | |
import scala.util.Random | |
/** | |
* Benchmarks for com.spotify.scio.util.BloomFilter | |
* | |
* Creating a BF from a collection | |
*/ | |
object RoaringBloomFilterAggregatorBenchmark { | |
def createRandomString(nbrOfStrings: Int, lengthOfStrings: Int): Seq[String] = | |
Seq.fill(nbrOfStrings)(Random.nextString(lengthOfStrings)) | |
@State(Scope.Benchmark) | |
class BloomFilterState { | |
@Param(Array("100", "1000", "10000")) | |
var nbrOfElements: Int = 0 | |
@Param(Array("0.01", "0.001")) | |
var falsePositiveRate: Double = 0 | |
var randomStrings: Seq[String] = _ | |
@Setup(Level.Trial) | |
def setup(): Unit = | |
randomStrings = createRandomString(nbrOfElements, 10) | |
} | |
} | |
@State(Scope.Benchmark) | |
class RoaringBloomFilterAggregatorBenchmark { | |
import RoaringBloomFilterAggregatorBenchmark._ | |
@Benchmark | |
def scioBFAggregator(bloomFilterState: BloomFilterState): ImmBloomFilter[String] = { | |
val scioutilbf = | |
BloomFilter[String](bloomFilterState.nbrOfElements, bloomFilterState.falsePositiveRate) | |
val bfAggregator = BFAggregator[String](scioutilbf.numHashes, scioutilbf.width) | |
val prepared = bloomFilterState.randomStrings.map(bfAggregator.prepare) | |
val aggregatedBf = bfAggregator.semigroup | |
.sumOption(prepared) // FIXME this would be pretty slow with multiple function calls. | |
.map(bfAggregator.present) | |
.get // TODO getOrElse Zero | |
// val sc = ScioContext() | |
// val sBf = sc.parallelize(bloomFilterState.randomStrings).aggregate(bfAggregator) | |
// sBf.materialize.underlying.value.toList.headOption.get | |
aggregatedBf | |
} | |
} | |
*/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Linked in twitter/algebird#675