Last active
August 29, 2015 14:01
-
-
Save huntc/b0279d79fcb706e0e1e0 to your computer and use it in GitHub Desktop.
Prototype stream buffer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.typesafe.jse | |
import java.io.{IOException, OutputStream, InputStream} | |
import java.util.concurrent.locks.{Lock, ReentrantLock} | |
import java.util.concurrent.TimeUnit | |
import scala.concurrent.duration._ | |
/** | |
* Maintains a circular buffer where an output stream is used to push data into it, and an input stream is used to | |
* pull data from it. The caller is expected to *always* close the input and output streams. | |
*/ | |
class CircularStreamBuffer(val size: Int, val readTimeout: FiniteDuration, val writeTimeout: FiniteDuration) { | |
private val buffer = Array.ofDim[Int](size) | |
private val bufferLock = new ReentrantLock() | |
private def withLock[T](l: Lock)(block: => T): T = { | |
l.lock() | |
try { | |
block | |
} finally { | |
l.unlock() | |
} | |
} | |
private var r = 0 | |
private var w = 0 | |
private val bufferRead = bufferLock.newCondition() | |
private val bufferWritten = bufferLock.newCondition() | |
private var inputOpen = true | |
private var outputOpen = true | |
class BufferInputStream extends InputStream { | |
override def close(): Unit = withLock(bufferLock) { | |
bufferRead.signal() | |
inputOpen = false | |
} | |
override def read(): Int = { | |
withLock(bufferLock) { | |
if (r == w) { | |
if (outputOpen && !bufferWritten.await(readTimeout.toMillis, TimeUnit.MILLISECONDS)) { | |
throw new IOException("timed out waiting for the stream to be written while wanting to read") | |
} | |
} | |
if (r < w) { | |
val c = buffer(r) | |
r += 1 | |
bufferRead.signal() | |
c | |
} else { | |
-1 | |
} | |
} | |
} | |
} | |
class BufferOutputStream extends OutputStream { | |
override def close(): Unit = withLock(bufferLock) { | |
bufferWritten.signal() | |
outputOpen = false | |
} | |
override def write(b: Int): Unit = { | |
withLock(bufferLock) { | |
if (inputOpen) { | |
buffer(w) = b | |
w += 1 | |
if (w == size) { | |
if (!bufferRead.await(writeTimeout.toMillis, TimeUnit.MILLISECONDS)) { | |
throw new IOException("timed out waiting for the stream to be read while wanting to write") | |
} | |
w = 0 | |
} | |
bufferWritten.signal() | |
} | |
} | |
} | |
} | |
val inputStream = new BufferInputStream | |
val outputStream = new BufferOutputStream | |
} | |
object CircularStreamBuffer { | |
val DefaultBufferSize = 8192 | |
def apply(size: Int = DefaultBufferSize, | |
readTimeout: FiniteDuration = 30.seconds, | |
writeTimeout: FiniteDuration = 30.seconds) = new CircularStreamBuffer(size, readTimeout, writeTimeout) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment