Skip to content

Instantly share code, notes, and snippets.

@tsuna
Created March 7, 2012 16:59
Show Gist options
  • Save tsuna/1994388 to your computer and use it in GitHub Desktop.
Save tsuna/1994388 to your computer and use it in GitHub Desktop.
Atomic increment coalescing for asynchbase
// Copyright (C) 2012 Benoit Sigoure
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or (at your
// option) any later version. This program is distributed in the hope that it
// will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
// General Public License for more details. You should have received a copy
// of the GNU Lesser General Public License along with this program. If not,
// see <http://www.gnu.org/licenses/>.
import java.util.concurrent.TimeUnit
import com.twitter.util.Duration
import com.google.common.cache.{CacheBuilder => Builder}
import com.google.common.cache.Cache
import com.google.common.cache.CacheLoader
import com.google.common.cache.CacheStats
import com.google.common.cache.LoadingCache
import com.google.common.cache.RemovalListener
import com.google.common.cache.RemovalNotification
import com.stumbleupon.common.Counter
/** Helper to create in-memory concurrent LRU caches, courtesy of Google Guava. */
object CacheBuilder {
val NeverExpire = Duration.forever
def newBuilder[K, V](size: Int, ttl: Duration): Builder[K, V] = {
val ncpu = Runtime.getRuntime.availableProcessors
val builder = Builder.newBuilder()
// Beef up the concurrency level as this is the number of internal
// segments used by the hash map. The default is 4, which is not enough
// for us as we typically have more than that many threads concurrently
// accessing the map. Because Guava's LocalCache maintains a
// per-segment buffer of access operations not yet committed, having a
// few more segments than we actually need helps increase the number of
// read operations we can do on a segment of the map with no interleaved
// writes before the segment has to acquire the lock to flush the buffer.
// We can't control this otherwise, because it's a hard-coded constant
// in LocalCache.DRAIN_THRESHOLD = 0x3F = 63;
.concurrencyLevel(ncpu * 2)
.maximumSize(size)
// Pre-allocate a reasonable chunk of the max capacity.
.initialCapacity(size / 4)
if (ttl != NeverExpire)
builder.expireAfterWrite(ttl.inNanoseconds, TimeUnit.NANOSECONDS)
builder.asInstanceOf[Builder[K, V]]
}
/**
* Returns a new LRU cache.
* @param size Maximum number of entries the cache should contain.
* @param ttl Automatically remove entries from the cache after this amount
* of time has elapsed. If set to {@code NeverExpire} then entries are only
* removed in an LRU fashion when the maximum size is reached.
* @param create Function to call to populate the cache on a miss.
* @param onremoval if not {@code null} this function will be called
* whenever an item is removed from the cache, either manually or because of
* evictions.
*/
def apply[K, V](size: Int, ttl: Duration,
create: K => V,
onremoval: (K, V) => Unit = null): LoadingCache[K, V] = {
val builder = newBuilder[K, V](size, ttl)
if (onremoval != null)
builder.removalListener(new RemovalListener[K, V] {
override def onRemoval(notification: RemovalNotification[K, V]) =
onremoval(notification.getKey, notification.getValue)
})
builder.build(new CacheLoader[K, V] { override def load(key: K) = create(key) })
}
}
// Copyright (C) 2012 Benoit Sigoure
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or (at your
// option) any later version. This program is distributed in the hope that it
// will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
// General Public License for more details. You should have received a copy
// of the GNU Lesser General Public License along with this program. If not,
// see <http://www.gnu.org/licenses/>.
import java.util.concurrent.atomic.AtomicInteger
import scala.annotation.tailrec
import org.slf4j.LoggerFactory
import com.google.common.cache.CacheStats
import com.stumbleupon.async.Callback
import com.stumbleupon.async.Deferred
import com.twitter.util.Duration
import com.twitter.util.Future
import com.twitter.util.Promise
import com.twitter.util.Return
import com.twitter.util.Throw
import org.hbase.async._
/**
* HBase client built on top of asynchbase.
* This wrapper offers a Finagle-friendly, fully asynchronous, thread-safe
* interface to HBase. It contains various utility functions to interract
* with HBase, for instance to increment dashboard counters.
* This class implements increment coalescing through the
* `bufferAtomicIncrement' method. Increments are kept in an LRU cache which
* is frequently flushed to HBase.
* @param zkquorum ZooKeeper quorum specification.
* @param zkpath Path to the znode of the -ROOT- region.
* @param flushIncrement Interval at which to flush buffered counter
* increments. Note that this interval only applies to buffered counter
* increments and not to other kinds of writes.
*/
class HBase(zkquorum: String, zkpath: String, flushInterval: Duration) {
import HBase._
private[this] val client = new HBaseClient(zkquorum, zkpath)
/** Atomic increments are buffered/coalesced here. */
private[this] val incrementBuffer = CacheBuilder(32768, flushInterval,
newAtomicIncrement,
flushIncrement)
schedulePeriodicFlushes(this, flushInterval.inMilliseconds)
/** Returns stats on the increment buffer. */
def incrementBufferStats: CacheStats = incrementBuffer.stats
/** Synchronously flushes buffered increments. */
private def flushBufferedIncrements() {
// Flush all the buffered increments, then shutdown. This works solely
// because `invalidateAll' will *synchronously* remove everything. The
// Guava documentation says "Discards all entries in the cache, possibly
// asynchronously" but in practice the code in LocalCache works like this:
// for each segment:
// segment.clear
// Where clearing a segment consists in:
// lock the segment
// for each active entry:
// add entry to removal queue
// null out the hash table
// unlock the segment
// for each entry in removal queue:
// call the removal listener on that entry
// So by the time the call to `invalidateAll' returns, every single
// buffered increment will have been given to asynchbase, thus it is safe
// to trigger its shutdown and let it complete all outstanding operations.
log.debug("Flushing {} buffered increments", incrementBuffer.size)
incrementBuffer.invalidateAll
}
def shutdown(): Future[_] = {
flushBufferedIncrements
client.shutdown
}
/** Does an atomic increment on the given cell. */
def atomicIncrement(table: Array[Byte], key: Array[Byte],
family: Array[Byte], column: Array[Byte],
amount: Int = 1): Future[Long] = {
increments_.increment
client.atomicIncrement(new AtomicIncrementRequest(table, key, family,
column, amount)) map {
_.longValue // Required to convert from java.lang.Long to scala.Long
}
}
/**
* Schedules a buffered atomic <b>increment</b> on the given cell.
* Note that this method can only be used to increment, not to decrement.
* The amount must be greater than or equal to 0.
* @throws IllegalArgumentException if amount is negative.
*/
def bufferAtomicIncrement(table: Array[Byte], key: Array[Byte],
family: Array[Byte], column: Array[Byte],
amount: Int = 1): Future[Long] = {
if (amount < 0 || amount == Int.MaxValue)
throw new IllegalArgumentException("Invalid increment amount: " + amount)
bufferAtomicIncrement(HBaseRow(table, key, family, column), amount)
}
@tailrec
private def bufferAtomicIncrement(row: HBaseRow, amount: Int): Future[Long] = {
val increment = incrementBuffer.get(row)
if (increment.amount.addAndGet(amount) < 0) {
// Race condition. We got something out of the buffer, but in the mean
// time another thread picked it up and decided to send it to HBase. So
// we need to retry, which will create a new entry in the buffer.
increment.amount.addAndGet(-amount) // Undo our previous addAndGet.
bufferAtomicIncrement(row, amount) // Retry.
} else
increment.future
}
/** Flushes a buffered increment to HBase. */
private def flushIncrement(row: HBaseRow, increment: IncrementAmount) {
val amount = increment.amount.getAndSet(Int.MinValue)
atomicIncrement(row.table, row.key, row.family, row.column,
amount) map { cnt =>
increment.future() = Return(cnt)
cnt
} handle { case e: Throwable =>
increment.future() = Throw(e)
e
}
}
}
/** Uniquely identifies a row in HBase. */
private final case class HBaseRow(table: Array[Byte], key: Array[Byte],
family: Array[Byte], column: Array[Byte]) {
override def equals(other: Any): Boolean =
other match {
case that: HBaseRow =>
(Bytes.equals(table, that.table)
&& Bytes.equals(key, that.key)
&& Bytes.equals(family, that.family)
&& Bytes.equals(column, that.column))
case _ => false
}
override def hashCode: Int =
hash(table) + 41 * (
hash(key) + 41 * (
hash(family) + 41 * (
hash(column) + 41
)
)
)
private def hash(bytes: Array[Byte]): Int =
(1 /: bytes)(31 * _ + _)
}
/**
* Used to buffer atomic increments.
* @param amount By how much to increment.
* @param future The Future that will be given the result of the increment in
* HBase, once the RPC completes. If the increment fails, then that future
* would be given the exception.
* Invariants:
* amount is greater than or equal to 0
* If future.isDefined, the RPC has completed
*/
private final case class IncrementAmount(amount: AtomicInteger,
future: Promise[Long])
object HBase {
private val log = LoggerFactory.getLogger(getClass)
private def newAtomicIncrement(row: HBaseRow): IncrementAmount =
new IncrementAmount(new AtomicInteger(0), new Promise)
private def schedulePeriodicFlushes(hbase: HBase, flushIntervalMillis: Long) {
if (flushIntervalMillis <= 0)
throw new IllegalArgumentException("negative flush interval: " + flushIntervalMillis)
// Create a background thread to periodically flush incrementBuffer.
val flusher = new Thread("PeriodicIncrementFlush") {
override def run {
while (true)
try {
Thread.sleep(flushIntervalMillis)
hbase.flushBufferedIncrements
} catch {
case e: InterruptedException =>
log.warn("Periodic flush thread interrupted, thread exiting.", e)
return
case e: Throwable =>
log.error("Uncaught exception while doing a periodic increment flush.", e)
}
}
}
flusher.setDaemon(true)
flusher.start
}
/** Converts a Deferred into a Future. */
implicit def futureFromDeferred[A](d: Deferred[A]): Future[A] = {
val promise = new Promise[A]
d.addBoth(new Callback[Unit, A] {
def call(arg: A) = promise() = arg match {
case e: Throwable => Throw(e)
case _ => Return(arg)
}
})
promise
}
}
@tsuna
Copy link
Author

tsuna commented Mar 7, 2012

I manually excised a lot of irrelevant code so pardon me if the code above doesn't compile because I butchered too much of it. The code above requires Google Guava 0.11 or above. It's build with asynchbase and Finagle. I'd like to have this functionality essentially ported to Java and integrated in asynchbase itself.

This code runs in a user-facing service in production at StumbleUpon, where it's coalescing about 40k increments/s down to 2k/s.

@tsuna
Copy link
Author

tsuna commented Apr 6, 2012

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment