Created
January 13, 2017 18:38
-
-
Save easel/b2a1ca0590df60544ff3dbcaa0b5a08e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.theseventhsense.utils.akka | |
import akka.stream.stage._ | |
trait BufferObservation { | |
def update(count: Int) | |
} | |
object ObservableBuffer { | |
object FixedSizeBuffer { | |
/** | |
* Verbatim copy from akka.streams.impl | |
*/ | |
def apply[T](size: Int): FixedSizeBuffer[T] = | |
if (size < 1) throw new IllegalArgumentException("size must be positive") | |
else if (((size - 1) & size) == 0) new PowerOfTwoFixedSizeBuffer(size) | |
else new ModuloFixedSizeBuffer(size) | |
sealed abstract class FixedSizeBuffer[T](val size: Int) { | |
override def toString = s"Buffer($size, $readIdx, $writeIdx)(${(readIdx until writeIdx).map(get).mkString(", ")})" | |
private val buffer = new Array[AnyRef](size) | |
protected var readIdx = 0 | |
protected var writeIdx = 0 | |
def used: Int = writeIdx - readIdx | |
def isFull: Boolean = used == size | |
def isEmpty: Boolean = used == 0 | |
def nonEmpty: Boolean = used != 0 | |
def enqueue(elem: T): Unit = { | |
put(writeIdx, elem) | |
writeIdx += 1 | |
} | |
protected def toOffset(idx: Int): Int | |
def put(idx: Int, elem: T): Unit = buffer(toOffset(idx)) = { | |
val result = elem.asInstanceOf[AnyRef] | |
result | |
} | |
def get(idx: Int): T = buffer(toOffset(idx)).asInstanceOf[T] | |
def peek(): T = get(readIdx) | |
def dequeue(): T = { | |
val result = get(readIdx) | |
dropHead() | |
result | |
} | |
def clear(): Unit = { | |
java.util.Arrays.fill(buffer, null) | |
readIdx = 0 | |
writeIdx = 0 | |
} | |
def dropHead(): Unit = { | |
put(readIdx, null.asInstanceOf[T]) | |
readIdx += 1 | |
} | |
def dropTail(): Unit = { | |
writeIdx -= 1 | |
put(writeIdx, null.asInstanceOf[T]) | |
} | |
} | |
private final class ModuloFixedSizeBuffer[T](_size: Int) | |
extends FixedSizeBuffer[T](_size) { | |
override protected def toOffset(idx: Int): Int = idx % size | |
} | |
private final class PowerOfTwoFixedSizeBuffer[T](_size: Int) | |
extends FixedSizeBuffer[T](_size) { | |
private val Mask = size - 1 | |
override protected def toOffset(idx: Int): Int = idx & Mask | |
} | |
} | |
} | |
/** | |
* An observable, backpressuring buffer. Stores the number of used elements in a | |
* dropwizard metrics Histogram. Based directly off the default akka buffer | |
* implementation. | |
* | |
* Use it like this: | |
* | |
* val histogram = metricRegistry.histogram("my-buffer") | |
* val stream = (() => Source.fromPublisher(...)) | |
* .trasform(() => ObservableBuffer(BufferSize, histogram)) | |
* .runFold(...) | |
* | |
* @param size | |
* @param histogram | |
* @tparam T | |
*/ | |
// TODO: Port ObservableBuffer to new GraphStage API | |
case class ObservableBuffer[T](size: Int, histogram: BufferObservation) extends DetachedStage[T, T] { | |
import ObservableBuffer._ | |
private val buffer = FixedSizeBuffer[T](size) | |
override def onPush(elem: T, ctx: DetachedContext[T]): UpstreamDirective = { | |
histogram.update(buffer.used) | |
if (ctx.isHoldingDownstream) ctx.pushAndPull(elem) | |
else enqueueAction(ctx, elem) | |
} | |
override def onPull(ctx: DetachedContext[T]): DownstreamDirective = { | |
histogram.update(buffer.used) | |
if (ctx.isFinishing) { | |
val elem = buffer.dequeue() | |
if (buffer.isEmpty) ctx.pushAndFinish(elem) | |
else ctx.push(elem) | |
} else if (ctx.isHoldingUpstream) ctx.pushAndPull(buffer.dequeue()) | |
else if (buffer.isEmpty) ctx.holdDownstream() | |
else ctx.push(buffer.dequeue()) | |
} | |
override def onUpstreamFinish(ctx: DetachedContext[T]): TerminationDirective = { | |
histogram.update(buffer.used) | |
if (buffer.isEmpty) ctx.finish() | |
else ctx.absorbTermination() | |
} | |
private val enqueueAction: (DetachedContext[T], T) ⇒ UpstreamDirective = { (ctx, elem) ⇒ | |
buffer.enqueue(elem) | |
if (buffer.isFull) ctx.holdUpstream() | |
else ctx.pull() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment