Skip to content

Instantly share code, notes, and snippets.

@easel
Created January 13, 2017 18:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save easel/b2a1ca0590df60544ff3dbcaa0b5a08e to your computer and use it in GitHub Desktop.
Save easel/b2a1ca0590df60544ff3dbcaa0b5a08e to your computer and use it in GitHub Desktop.
package com.theseventhsense.utils.akka
import akka.stream.stage._
trait BufferObservation {
def update(count: Int)
}
object ObservableBuffer {
object FixedSizeBuffer {
/**
* Verbatim copy from akka.streams.impl
*/
def apply[T](size: Int): FixedSizeBuffer[T] =
if (size < 1) throw new IllegalArgumentException("size must be positive")
else if (((size - 1) & size) == 0) new PowerOfTwoFixedSizeBuffer(size)
else new ModuloFixedSizeBuffer(size)
sealed abstract class FixedSizeBuffer[T](val size: Int) {
override def toString = s"Buffer($size, $readIdx, $writeIdx)(${(readIdx until writeIdx).map(get).mkString(", ")})"
private val buffer = new Array[AnyRef](size)
protected var readIdx = 0
protected var writeIdx = 0
def used: Int = writeIdx - readIdx
def isFull: Boolean = used == size
def isEmpty: Boolean = used == 0
def nonEmpty: Boolean = used != 0
def enqueue(elem: T): Unit = {
put(writeIdx, elem)
writeIdx += 1
}
protected def toOffset(idx: Int): Int
def put(idx: Int, elem: T): Unit = buffer(toOffset(idx)) = {
val result = elem.asInstanceOf[AnyRef]
result
}
def get(idx: Int): T = buffer(toOffset(idx)).asInstanceOf[T]
def peek(): T = get(readIdx)
def dequeue(): T = {
val result = get(readIdx)
dropHead()
result
}
def clear(): Unit = {
java.util.Arrays.fill(buffer, null)
readIdx = 0
writeIdx = 0
}
def dropHead(): Unit = {
put(readIdx, null.asInstanceOf[T])
readIdx += 1
}
def dropTail(): Unit = {
writeIdx -= 1
put(writeIdx, null.asInstanceOf[T])
}
}
private final class ModuloFixedSizeBuffer[T](_size: Int)
extends FixedSizeBuffer[T](_size) {
override protected def toOffset(idx: Int): Int = idx % size
}
private final class PowerOfTwoFixedSizeBuffer[T](_size: Int)
extends FixedSizeBuffer[T](_size) {
private val Mask = size - 1
override protected def toOffset(idx: Int): Int = idx & Mask
}
}
}
/**
* An observable, backpressuring buffer. Stores the number of used elements in a
* dropwizard metrics Histogram. Based directly off the default akka buffer
* implementation.
*
* Use it like this:
*
* val histogram = metricRegistry.histogram("my-buffer")
* val stream = (() => Source.fromPublisher(...))
* .trasform(() => ObservableBuffer(BufferSize, histogram))
* .runFold(...)
*
* @param size
* @param histogram
* @tparam T
*/
// TODO: Port ObservableBuffer to new GraphStage API
case class ObservableBuffer[T](size: Int, histogram: BufferObservation) extends DetachedStage[T, T] {
import ObservableBuffer._
private val buffer = FixedSizeBuffer[T](size)
override def onPush(elem: T, ctx: DetachedContext[T]): UpstreamDirective = {
histogram.update(buffer.used)
if (ctx.isHoldingDownstream) ctx.pushAndPull(elem)
else enqueueAction(ctx, elem)
}
override def onPull(ctx: DetachedContext[T]): DownstreamDirective = {
histogram.update(buffer.used)
if (ctx.isFinishing) {
val elem = buffer.dequeue()
if (buffer.isEmpty) ctx.pushAndFinish(elem)
else ctx.push(elem)
} else if (ctx.isHoldingUpstream) ctx.pushAndPull(buffer.dequeue())
else if (buffer.isEmpty) ctx.holdDownstream()
else ctx.push(buffer.dequeue())
}
override def onUpstreamFinish(ctx: DetachedContext[T]): TerminationDirective = {
histogram.update(buffer.used)
if (buffer.isEmpty) ctx.finish()
else ctx.absorbTermination()
}
private val enqueueAction: (DetachedContext[T], T) ⇒ UpstreamDirective = { (ctx, elem) ⇒
buffer.enqueue(elem)
if (buffer.isFull) ctx.holdUpstream()
else ctx.pull()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment