Skip to content

Instantly share code, notes, and snippets.

@alexandru
Last active February 21, 2019 15:23
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save alexandru/88aa2fe6a3fb453126a66b23ba200d96 to your computer and use it in GitHub Desktop.
Save alexandru/88aa2fe6a3fb453126a66b23ba200d96 to your computer and use it in GitHub Desktop.
import java.io.{File, FileInputStream}
import java.util
import monix.execution.Cancelable
import monix.reactive.Observable
import scala.util.control.NonFatal
def fromInputStream(in: java.io.InputStream, chunkSize: Int = 256): Observable[Array[Byte]] = {
val iterator = new Iterator[Array[Byte]] {
private[this] val buffer = new Array[Byte](chunkSize)
private[this] var lastCount = 0
def hasNext: Boolean =
lastCount match {
case 0 =>
lastCount = in.read(buffer)
lastCount >= 0
case nr =>
nr >= 0
}
def next(): Array[Byte] = {
if (lastCount < 0)
throw new NoSuchElementException("next")
else {
val result = util.Arrays.copyOf(buffer, lastCount)
lastCount = 0
result
}
}
}
Observable.fromIterator(iterator)
}
def fromFile(file: File, chunkSize: Int = 256): Observable[Array[Byte]] =
Observable.unsafeCreate { subscriber =>
var streamErrors = true
try {
val in = new FileInputStream(file)
streamErrors = false
fromInputStream(in, chunkSize)
.unsafeSubscribeFn(subscriber)
} catch {
case NonFatal(ex) =>
if (streamErrors) subscriber.onError(ex)
else subscriber.scheduler.reportFailure(ex)
Cancelable.empty
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment