Skip to content

Instantly share code, notes, and snippets.

@wjsl
Created November 2, 2015 20:47
Show Gist options
  • Save wjsl/425acb878a090df7fa8a to your computer and use it in GitHub Desktop.
Save wjsl/425acb878a090df7fa8a to your computer and use it in GitHub Desktop.
Micro-benchmark showing the various effects of iterator code on read times in Accumulo
import java.io.File
import java.util
import java.util.Random
import java.util.concurrent.TimeUnit
import com.google.common.base.Stopwatch
import com.google.common.io.{ByteStreams, Files}
import org.apache.accumulo.core.client.{IteratorSetting, BatchWriterConfig, BatchWriter, Scanner}
import org.apache.accumulo.core.data._
import org.apache.accumulo.core.iterators.{IteratorEnvironment, SortedKeyValueIterator, WrappingIterator}
import org.apache.accumulo.core.security.Authorizations
import org.apache.accumulo.minicluster.MiniAccumuloCluster
import org.apache.hadoop.io.Text
import scala.collection.JavaConversions._
/**
* Created by bill on 11/2/15.
*/
object IteratorBenchmark {
def main(args: Array[String]): Unit = {
var dir: File = null
var accumulo: MiniAccumuloCluster = null
try {
dir = Files.createTempDir()
dir.deleteOnExit()
println("Working in " + dir)
accumulo = new MiniAccumuloCluster(dir, "secret")
accumulo.start()
val con = accumulo.getConnector("root", "secret")
con.tableOperations().create("test")
val writer = con.createBatchWriter("test", new BatchWriterConfig())
write(writer)
writer.close()
val out = ByteStreams.nullOutputStream()
// get everything in the cache by priming the table w/ a sequential read
val reader = con.createScanner("test", new Authorizations())
reader.setRange(new Range)
reader.foreach(e => { out.write(1) }) // cheap operation to avoid having this zero'd out
reader.addScanIterator(new IteratorSetting(50, "top", classOf[MyWrapper]))
println("Testing wrapping iterator")
read(reader)
reader.removeScanIterator("top")
reader.addScanIterator(new IteratorSetting(50, "top", classOf[TextIterator]))
println("Testing TextIterator")
read(reader)
reader.removeScanIterator("top")
reader.addScanIterator(new IteratorSetting(50, "top", classOf[StringIterator]))
println("Testing StringIterator")
read(reader)
reader.removeScanIterator("top")
println("Testing no iterator")
read(reader)
}
}
def read(reader: Scanner): Unit = {
reader.setRange(new Range)
val timer = new Stopwatch().start()
val itemsRead = reader.foldLeft(0)((agg, next) => agg + 1)
timer.stop()
println("Took " + timer.elapsed(TimeUnit.MILLISECONDS) + "ms to read " + itemsRead + " entries")
}
def write(writer: BatchWriter): Unit = {
val random = new Random
for(x <- 0 to 250000) {
val m = new Mutation(x.toString)
val value = new Array[Byte](1024)
random.nextBytes(value)
m.put("", "", 0, new Value(value))
writer.addMutation(m)
}
writer.flush()
}
}
class MyWrapper extends WrappingIterator {}
class StringIterator extends SortedKeyValueIterator[Key, Value] {
var str: String = null
var src: SortedKeyValueIterator[Key, Value] = null
var tk: Key = null
var tv: Value = null
def this(src: SortedKeyValueIterator[Key, Value]) {
this()
this.src = src
}
override def deepCopy(env: IteratorEnvironment): SortedKeyValueIterator[Key, Value] = {
return new TextIterator(src.deepCopy(env))
}
override def next(): Unit = {
tk = null
tv = null
if(src.hasTop) {
tk = src.getTopKey
tv = src.getTopValue
str = new String(tv.get())
src.next()
}
}
override def getTopValue(): Value = {
return tv;
}
override def init(source: SortedKeyValueIterator[Key, Value], options: util.Map[String, String], env: IteratorEnvironment): Unit = {
this.src = source
}
override def getTopKey(): Key = {
return tk
}
override def seek(range: Range, columnFamilies: util.Collection[ByteSequence], inclusive: Boolean): Unit = {
this.src.seek(range, columnFamilies, inclusive)
this.next()
}
override def hasTop(): Boolean = {
tk != null
}
}
class TextIterator extends SortedKeyValueIterator[Key, Value] {
val txt = new Text()
var src: SortedKeyValueIterator[Key, Value] = null
var tk: Key = null
var tv: Value = null
def this(src: SortedKeyValueIterator[Key, Value]) {
this()
this.src = src
}
override def deepCopy(env: IteratorEnvironment): SortedKeyValueIterator[Key, Value] = {
return new TextIterator(src.deepCopy(env))
}
override def next(): Unit = {
tk = null
tv = null
if(src.hasTop) {
tk = src.getTopKey
tv = src.getTopValue
txt.set(tv.get())
src.next()
}
}
override def getTopValue(): Value = {
return tv;
}
override def init(source: SortedKeyValueIterator[Key, Value], options: util.Map[String, String], env: IteratorEnvironment): Unit = {
this.src = source
}
override def getTopKey(): Key = {
return tk
}
override def seek(range: Range, columnFamilies: util.Collection[ByteSequence], inclusive: Boolean): Unit = {
this.src.seek(range, columnFamilies, inclusive)
this.next()
}
override def hasTop(): Boolean = {
tk != null
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment