Skip to content

Instantly share code, notes, and snippets.

@huntc
Last active August 29, 2015 14:01
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save huntc/b0279d79fcb706e0e1e0 to your computer and use it in GitHub Desktop.
Prototype stream buffer
package com.typesafe.jse
import java.io.{IOException, OutputStream, InputStream}
import java.util.concurrent.locks.{Lock, ReentrantLock}
import java.util.concurrent.TimeUnit
import scala.concurrent.duration._
/**
* Maintains a circular buffer where an output stream is used to push data into it, and an input stream is used to
* pull data from it. The caller is expected to *always* close the input and output streams.
*/
class CircularStreamBuffer(val size: Int, val readTimeout: FiniteDuration, val writeTimeout: FiniteDuration) {
private val buffer = Array.ofDim[Int](size)
private val bufferLock = new ReentrantLock()
private def withLock[T](l: Lock)(block: => T): T = {
l.lock()
try {
block
} finally {
l.unlock()
}
}
private var r = 0
private var w = 0
private val bufferRead = bufferLock.newCondition()
private val bufferWritten = bufferLock.newCondition()
private var inputOpen = true
private var outputOpen = true
class BufferInputStream extends InputStream {
override def close(): Unit = withLock(bufferLock) {
bufferRead.signal()
inputOpen = false
}
override def read(): Int = {
withLock(bufferLock) {
if (r == w) {
if (outputOpen && !bufferWritten.await(readTimeout.toMillis, TimeUnit.MILLISECONDS)) {
throw new IOException("timed out waiting for the stream to be written while wanting to read")
}
}
if (r < w) {
val c = buffer(r)
r += 1
bufferRead.signal()
c
} else {
-1
}
}
}
}
class BufferOutputStream extends OutputStream {
override def close(): Unit = withLock(bufferLock) {
bufferWritten.signal()
outputOpen = false
}
override def write(b: Int): Unit = {
withLock(bufferLock) {
if (inputOpen) {
buffer(w) = b
w += 1
if (w == size) {
if (!bufferRead.await(writeTimeout.toMillis, TimeUnit.MILLISECONDS)) {
throw new IOException("timed out waiting for the stream to be read while wanting to write")
}
w = 0
}
bufferWritten.signal()
}
}
}
}
val inputStream = new BufferInputStream
val outputStream = new BufferOutputStream
}
object CircularStreamBuffer {
val DefaultBufferSize = 8192
def apply(size: Int = DefaultBufferSize,
readTimeout: FiniteDuration = 30.seconds,
writeTimeout: FiniteDuration = 30.seconds) = new CircularStreamBuffer(size, readTimeout, writeTimeout)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment