Skip to content

Instantly share code, notes, and snippets.

@pyldin601
Created September 7, 2016 13:32
Show Gist options
  • Save pyldin601/a528d6a525295bfc11ba7f81b6c544e4 to your computer and use it in GitHub Desktop.
Save pyldin601/a528d6a525295bfc11ba7f81b6c544e4 to your computer and use it in GitHub Desktop.
package biz.radioteria.flow.stream
import java.io.IOException
import java.io.OutputStream
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
class NonblockingOutputStream(private val outputStream: OutputStream) : OutputStream() {
val QUEUE_SIZE = 128
val threadPoolExecutor: ThreadPoolExecutor =
ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, LinkedBlockingQueue(QUEUE_SIZE))
override fun write(b: Int) {
blockingAction {
outputStream.write(b)
}
}
override fun write(b: ByteArray) {
blockingAction {
outputStream.write(b)
}
}
override fun write(b: ByteArray, off: Int, len: Int) {
blockingAction {
outputStream.write(b, off, len)
}
}
override fun flush() {
blockingAction {
outputStream.flush()
}
}
override fun close() {
blockingAction {
outputStream.close()
}
}
fun isBlocked(): Boolean {
return threadPoolExecutor.queue.remainingCapacity() == 0
}
private fun blockingAction(block: () -> Unit) {
if (isBlocked()) {
throw IOException("Queue size exceeded limit ($QUEUE_SIZE)")
}
threadPoolExecutor.submit {
block.invoke()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment