Skip to content

Instantly share code, notes, and snippets.

Forked from tsuna/HBase.scala
Created March 7, 2012 21:50
Show Gist options
  • Save bennettandrews/1996466 to your computer and use it in GitHub Desktop.
Save bennettandrews/1996466 to your computer and use it in GitHub Desktop.
Atomic increment coalescing for asynchbase
// Copyright (C) 2012 Benoit Sigoure
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or (at your
// option) any later version. This program is distributed in the hope that it
// will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
// General Public License for more details. You should have received a copy
// of the GNU Lesser General Public License along with this program. If not,
// see <>.
import java.util.concurrent.TimeUnit
import com.twitter.util.Duration
import{CacheBuilder => Builder}
import com.stumbleupon.common.Counter
/** Helper to create in-memory concurrent LRU caches, courtesy of Google Guava. */
object CacheBuilder {
val NeverExpire = Duration.forever
def newBuilder[K, V](size: Int, ttl: Duration): Builder[K, V] = {
val ncpu = Runtime.getRuntime.availableProcessors
val builder = Builder.newBuilder()
// Beef up the concurrency level as this is the number of internal
// segments used by the hash map. The default is 4, which is not enough
// for us as we typically have more than that many threads concurrently
// accessing the map. Because Guava's LocalCache maintains a
// per-segment buffer of access operations not yet committed, having a
// few more segments than we actually need helps increase the number of
// read operations we can do on a segment of the map with no interleaved
// writes before the segment has to acquire the lock to flush the buffer.
// We can't control this otherwise, because it's a hard-coded constant
// in LocalCache.DRAIN_THRESHOLD = 0x3F = 63;
.concurrencyLevel(ncpu * 2)
// Pre-allocate a reasonable chunk of the max capacity.
.initialCapacity(size / 4)
if (ttl != NeverExpire)
builder.expireAfterWrite(ttl.inNanoseconds, TimeUnit.NANOSECONDS)
builder.asInstanceOf[Builder[K, V]]
* Returns a new LRU cache.
* @param size Maximum number of entries the cache should contain.
* @param ttl Automatically remove entries from the cache after this amount
* of time has elapsed. If set to {@code NeverExpire} then entries are only
* removed in an LRU fashion when the maximum size is reached.
* @param create Function to call to populate the cache on a miss.
* @param onremoval if not {@code null} this function will be called
* whenever an item is removed from the cache, either manually or because of
* evictions.
def apply[K, V](size: Int, ttl: Duration,
create: K => V,
onremoval: (K, V) => Unit = null): LoadingCache[K, V] = {
val builder = newBuilder[K, V](size, ttl)
if (onremoval != null)
builder.removalListener(new RemovalListener[K, V] {
override def onRemoval(notification: RemovalNotification[K, V]) =
onremoval(notification.getKey, notification.getValue)
}) CacheLoader[K, V] { override def load(key: K) = create(key) })
// Copyright (C) 2012 Benoit Sigoure
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or (at your
// option) any later version. This program is distributed in the hope that it
// will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
// General Public License for more details. You should have received a copy
// of the GNU Lesser General Public License along with this program. If not,
// see <>.
import java.util.concurrent.atomic.AtomicInteger
import scala.annotation.tailrec
import org.slf4j.LoggerFactory
import com.stumbleupon.async.Callback
import com.stumbleupon.async.Deferred
import com.twitter.util.Duration
import com.twitter.util.Future
import com.twitter.util.Promise
import com.twitter.util.Return
import com.twitter.util.Throw
import org.hbase.async._
* HBase client built on top of asynchbase.
* This wrapper offers a Finagle-friendly, fully asynchronous, thread-safe
* interface to HBase. It contains various utility functions to interract
* with HBase, for instance to increment dashboard counters.
* This class implements increment coalescing through the
* `bufferAtomicIncrement' method. Increments are kept in an LRU cache which
* is frequently flushed to HBase.
* @param zkquorum ZooKeeper quorum specification.
* @param zkpath Path to the znode of the -ROOT- region.
* @param flushIncrement Interval at which to flush buffered counter
* increments. Note that this interval only applies to buffered counter
* increments and not to other kinds of writes.
class HBase(zkquorum: String, zkpath: String, flushInterval: Duration) {
import HBase._
private[this] val client = new HBaseClient(zkquorum, zkpath)
/** Atomic increments are buffered/coalesced here. */
private[this] val incrementBuffer = CacheBuilder(32768, flushInterval,
schedulePeriodicFlushes(this, flushInterval.inMilliseconds)
/** Returns stats on the increment buffer. */
def incrementBufferStats: CacheStats = incrementBuffer.stats
/** Synchronously flushes buffered increments. */
private def flushBufferedIncrements() {
// Flush all the buffered increments, then shutdown. This works solely
// because `invalidateAll' will *synchronously* remove everything. The
// Guava documentation says "Discards all entries in the cache, possibly
// asynchronously" but in practice the code in LocalCache works like this:
// for each segment:
// segment.clear
// Where clearing a segment consists in:
// lock the segment
// for each active entry:
// add entry to removal queue
// null out the hash table
// unlock the segment
// for each entry in removal queue:
// call the removal listener on that entry
// So by the time the call to `invalidateAll' returns, every single
// buffered increment will have been given to asynchbase, thus it is safe
// to trigger its shutdown and let it complete all outstanding operations.
log.debug("Flushing {} buffered increments", incrementBuffer.size)
def shutdown(): Future[_] = {
/** Does an atomic increment on the given cell. */
def atomicIncrement(table: Array[Byte], key: Array[Byte],
family: Array[Byte], column: Array[Byte],
amount: Int = 1): Future[Long] = {
client.atomicIncrement(new AtomicIncrementRequest(table, key, family,
column, amount)) map {
_.longValue // Required to convert from java.lang.Long to scala.Long
* Schedules a buffered atomic <b>increment</b> on the given cell.
* Note that this method can only be used to increment, not to decrement.
* The amount must be greater than or equal to 0.
* @throws IllegalArgumentException if amount is negative.
def bufferAtomicIncrement(table: Array[Byte], key: Array[Byte],
family: Array[Byte], column: Array[Byte],
amount: Int = 1): Future[Long] = {
if (amount < 0 || amount == Int.MaxValue)
throw new IllegalArgumentException("Invalid increment amount: " + amount)
bufferAtomicIncrement(HBaseRow(table, key, family, column), amount)
private def bufferAtomicIncrement(row: HBaseRow, amount: Int): Future[Long] = {
val increment = incrementBuffer.get(row)
if (increment.amount.addAndGet(amount) < 0) {
// Race condition. We got something out of the buffer, but in the mean
// time another thread picked it up and decided to send it to HBase. So
// we need to retry, which will create a new entry in the buffer.
increment.amount.addAndGet(-amount) // Undo our previous addAndGet.
bufferAtomicIncrement(row, amount) // Retry.
} else
/** Flushes a buffered increment to HBase. */
private def flushIncrement(row: HBaseRow, increment: IncrementAmount) {
val amount = increment.amount.getAndSet(Int.MinValue)
atomicIncrement(row.table, row.key,, row.column,
amount) map { cnt =>
increment.future() = Return(cnt)
} handle { case e: Throwable =>
increment.future() = Throw(e)
/** Uniquely identifies a row in HBase. */
private final case class HBaseRow(table: Array[Byte], key: Array[Byte],
family: Array[Byte], column: Array[Byte]) {
override def equals(other: Any): Boolean =
other match {
case that: HBaseRow =>
(Bytes.equals(table, that.table)
&& Bytes.equals(key, that.key)
&& Bytes.equals(family,
&& Bytes.equals(column, that.column))
case _ => false
override def hashCode: Int =
hash(table) + 41 * (
hash(key) + 41 * (
hash(family) + 41 * (
hash(column) + 41
private def hash(bytes: Array[Byte]): Int =
(1 /: bytes)(31 * _ + _)
* Used to buffer atomic increments.
* @param amount By how much to increment.
* @param future The Future that will be given the result of the increment in
* HBase, once the RPC completes. If the increment fails, then that future
* would be given the exception.
* Invariants:
* amount is greater than or equal to 0
* If future.isDefined, the RPC has completed
private final case class IncrementAmount(amount: AtomicInteger,
future: Promise[Long])
object HBase {
private val log = LoggerFactory.getLogger(getClass)
private def newAtomicIncrement(row: HBaseRow): IncrementAmount =
new IncrementAmount(new AtomicInteger(0), new Promise)
private def schedulePeriodicFlushes(hbase: HBase, flushIntervalMillis: Long) {
if (flushIntervalMillis <= 0)
throw new IllegalArgumentException("negative flush interval: " + flushIntervalMillis)
// Create a background thread to periodically flush incrementBuffer.
val flusher = new Thread("PeriodicIncrementFlush") {
override def run {
while (true)
try {
} catch {
case e: InterruptedException =>
log.warn("Periodic flush thread interrupted, thread exiting.", e)
case e: Throwable =>
log.error("Uncaught exception while doing a periodic increment flush.", e)
/** Converts a Deferred into a Future. */
implicit def futureFromDeferred[A](d: Deferred[A]): Future[A] = {
val promise = new Promise[A]
d.addBoth(new Callback[Unit, A] {
def call(arg: A) = promise() = arg match {
case e: Throwable => Throw(e)
case _ => Return(arg)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment