Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save 6qat/a06e3fbca0b3b570ff28de0a06df0506 to your computer and use it in GitHub Desktop.
Save 6qat/a06e3fbca0b3b570ff28de0a06df0506 to your computer and use it in GitHub Desktop.
import java.io.{File, FileInputStream}
import java.util
import monix.execution.Cancelable
import monix.reactive.Observable
import scala.util.control.NonFatal
def fromInputStream(in: java.io.InputStream, chunkSize: Int = 256): Observable[Array[Byte]] = {
val iterator = new Iterator[Array[Byte]] {
private[this] val buffer = new Array[Byte](chunkSize)
private[this] var lastCount = 0
def hasNext: Boolean =
lastCount match {
case 0 =>
lastCount = in.read(buffer)
lastCount >= 0
case nr =>
nr >= 0
}
def next(): Array[Byte] = {
if (lastCount < 0)
throw new NoSuchElementException("next")
else {
val result = util.Arrays.copyOf(buffer, lastCount)
lastCount = 0
result
}
}
}
Observable.fromIterator(iterator)
}
def fromFile(file: File, chunkSize: Int = 256): Observable[Array[Byte]] =
Observable.unsafeCreate { subscriber =>
var streamErrors = true
try {
val in = new FileInputStream(file)
streamErrors = false
fromInputStream(in, chunkSize)
.unsafeSubscribeFn(subscriber)
} catch {
case NonFatal(ex) =>
if (streamErrors) subscriber.onError(ex)
else subscriber.scheduler.reportFailure(ex)
Cancelable.empty
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment