Created
November 2, 2015 20:47
-
-
Save wjsl/425acb878a090df7fa8a to your computer and use it in GitHub Desktop.
Micro-benchmark showing the various effects of iterator code on read times in Accumulo
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.File | |
import java.util | |
import java.util.Random | |
import java.util.concurrent.TimeUnit | |
import com.google.common.base.Stopwatch | |
import com.google.common.io.{ByteStreams, Files} | |
import org.apache.accumulo.core.client.{IteratorSetting, BatchWriterConfig, BatchWriter, Scanner} | |
import org.apache.accumulo.core.data._ | |
import org.apache.accumulo.core.iterators.{IteratorEnvironment, SortedKeyValueIterator, WrappingIterator} | |
import org.apache.accumulo.core.security.Authorizations | |
import org.apache.accumulo.minicluster.MiniAccumuloCluster | |
import org.apache.hadoop.io.Text | |
import scala.collection.JavaConversions._ | |
/** | |
* Created by bill on 11/2/15. | |
*/ | |
object IteratorBenchmark { | |
def main(args: Array[String]): Unit = { | |
var dir: File = null | |
var accumulo: MiniAccumuloCluster = null | |
try { | |
dir = Files.createTempDir() | |
dir.deleteOnExit() | |
println("Working in " + dir) | |
accumulo = new MiniAccumuloCluster(dir, "secret") | |
accumulo.start() | |
val con = accumulo.getConnector("root", "secret") | |
con.tableOperations().create("test") | |
val writer = con.createBatchWriter("test", new BatchWriterConfig()) | |
write(writer) | |
writer.close() | |
val out = ByteStreams.nullOutputStream() | |
// get everything in the cache by priming the table w/ a sequential read | |
val reader = con.createScanner("test", new Authorizations()) | |
reader.setRange(new Range) | |
reader.foreach(e => { out.write(1) }) // cheap operation to avoid having this zero'd out | |
reader.addScanIterator(new IteratorSetting(50, "top", classOf[MyWrapper])) | |
println("Testing wrapping iterator") | |
read(reader) | |
reader.removeScanIterator("top") | |
reader.addScanIterator(new IteratorSetting(50, "top", classOf[TextIterator])) | |
println("Testing TextIterator") | |
read(reader) | |
reader.removeScanIterator("top") | |
reader.addScanIterator(new IteratorSetting(50, "top", classOf[StringIterator])) | |
println("Testing StringIterator") | |
read(reader) | |
reader.removeScanIterator("top") | |
println("Testing no iterator") | |
read(reader) | |
} | |
} | |
def read(reader: Scanner): Unit = { | |
reader.setRange(new Range) | |
val timer = new Stopwatch().start() | |
val itemsRead = reader.foldLeft(0)((agg, next) => agg + 1) | |
timer.stop() | |
println("Took " + timer.elapsed(TimeUnit.MILLISECONDS) + "ms to read " + itemsRead + " entries") | |
} | |
def write(writer: BatchWriter): Unit = { | |
val random = new Random | |
for(x <- 0 to 250000) { | |
val m = new Mutation(x.toString) | |
val value = new Array[Byte](1024) | |
random.nextBytes(value) | |
m.put("", "", 0, new Value(value)) | |
writer.addMutation(m) | |
} | |
writer.flush() | |
} | |
} | |
class MyWrapper extends WrappingIterator {} | |
class StringIterator extends SortedKeyValueIterator[Key, Value] { | |
var str: String = null | |
var src: SortedKeyValueIterator[Key, Value] = null | |
var tk: Key = null | |
var tv: Value = null | |
def this(src: SortedKeyValueIterator[Key, Value]) { | |
this() | |
this.src = src | |
} | |
override def deepCopy(env: IteratorEnvironment): SortedKeyValueIterator[Key, Value] = { | |
return new TextIterator(src.deepCopy(env)) | |
} | |
override def next(): Unit = { | |
tk = null | |
tv = null | |
if(src.hasTop) { | |
tk = src.getTopKey | |
tv = src.getTopValue | |
str = new String(tv.get()) | |
src.next() | |
} | |
} | |
override def getTopValue(): Value = { | |
return tv; | |
} | |
override def init(source: SortedKeyValueIterator[Key, Value], options: util.Map[String, String], env: IteratorEnvironment): Unit = { | |
this.src = source | |
} | |
override def getTopKey(): Key = { | |
return tk | |
} | |
override def seek(range: Range, columnFamilies: util.Collection[ByteSequence], inclusive: Boolean): Unit = { | |
this.src.seek(range, columnFamilies, inclusive) | |
this.next() | |
} | |
override def hasTop(): Boolean = { | |
tk != null | |
} | |
} | |
class TextIterator extends SortedKeyValueIterator[Key, Value] { | |
val txt = new Text() | |
var src: SortedKeyValueIterator[Key, Value] = null | |
var tk: Key = null | |
var tv: Value = null | |
def this(src: SortedKeyValueIterator[Key, Value]) { | |
this() | |
this.src = src | |
} | |
override def deepCopy(env: IteratorEnvironment): SortedKeyValueIterator[Key, Value] = { | |
return new TextIterator(src.deepCopy(env)) | |
} | |
override def next(): Unit = { | |
tk = null | |
tv = null | |
if(src.hasTop) { | |
tk = src.getTopKey | |
tv = src.getTopValue | |
txt.set(tv.get()) | |
src.next() | |
} | |
} | |
override def getTopValue(): Value = { | |
return tv; | |
} | |
override def init(source: SortedKeyValueIterator[Key, Value], options: util.Map[String, String], env: IteratorEnvironment): Unit = { | |
this.src = source | |
} | |
override def getTopKey(): Key = { | |
return tk | |
} | |
override def seek(range: Range, columnFamilies: util.Collection[ByteSequence], inclusive: Boolean): Unit = { | |
this.src.seek(range, columnFamilies, inclusive) | |
this.next() | |
} | |
override def hasTop(): Boolean = { | |
tk != null | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment