Skip to content

Instantly share code, notes, and snippets.

@zvozin
Last active December 17, 2015 09:19
Show Gist options
  • Save zvozin/5586323 to your computer and use it in GitHub Desktop.
Save zvozin/5586323 to your computer and use it in GitHub Desktop.
Time-and-space bounded Scalaz-based actor. Buffers elements of type A until either buffer size has been reached, or the time bound has elapsed.
import scalaz.concurrent.Actor
import collection.mutable.ArrayBuffer
import java.util.Timer
/**
* Usage:
*
* <code>
* val buffer = BufferActor[String](onFlush = _ foreach (println(_), onError = println(_))
* buffer ! "Now we're talking!"
* </code>
*
* Requires Timer (https://gist.github.com/zvozin/5586300)
*/
case class BufferActor[A](onFlush: List[A] => Unit,
onError: Throwable => Unit = throw (_),
bufferSize: Int = 100,
flushIntervalMs: Long = 2000L) {
/**
* Internal ADT for type closure
* @tparam A
*/
private trait Message[A]
private case object Flush extends Message[A]
private case class MessageWrapper(a: A) extends Message[A]
private val buffer = new ArrayBuffer[A](bufferSize)
private val flushTimer = new Timer()
private lazy val actor = Actor[Message[A]](onMessage, onError)
/**
* Get the flush ticker going ...
*/
flushTimer.runEvery(flushIntervalMs)(actor ! Flush)
private val onMessage: Message[A] => Unit = _ match {
case MessageWrapper(a) =>
buffer append a
if (buffer.size >= bufferSize) flush()
case Flush => flush()
}
private def flush() {
if (!buffer.isEmpty) {
onFlush(buffer.toList)
buffer.clear()
}
}
/**
* Our proud public API. Bang [A]s here.
* @param a
*/
def !(a: A) {
actor ! MessageWrapper(a)
}
}
/**
* Basic conveniences to use java.util.Timer, and an instance thereof.
*/
case class Timer() {
private val javaTimer = new java.util.Timer()
/**
* Run f in msFromNow millis
*/
def runIn(msFromNow: Long)(f: => Unit) {
javaTimer schedule(new TimerTask {def run() { f }}, msFromNow)
}
/**
* Run f every ms millis
*/
def runEvery(ms: Long)(f: => Unit) {
javaTimer schedule(new TimerTask {def run() { f }}, ms, ms)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment