Skip to content

Instantly share code, notes, and snippets.

@alexandru
Last active December 10, 2021 12:53
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alexandru/0e2290a7b5dd8de61ea3ab50e0e08627 to your computer and use it in GitHub Desktop.
Save alexandru/0e2290a7b5dd8de61ea3ab50e0e08627 to your computer and use it in GitHub Desktop.
import java.io.{File, FileInputStream, FileOutputStream}
import monix.eval.Task
import monix.execution.Ack
import monix.execution.Ack.{Continue, Stop}
import monix.reactive.{Consumer, Observable, Observer}
import scala.util.control.NonFatal
def copyFile(input: File, destination: File, chunkSize: Int): Task[Unit] =
Task.defer {
val in = new FileInputStream(input)
Observable.fromInputStream(in, chunkSize)
.consumeWith(fileConsumer(destination))
}
def fileConsumer(outputFile: File): Consumer[Array[Byte], Unit] =
Consumer.create { (scheduler, _, callback) =>
new Observer.Sync[Array[Byte]] {
private[this] val out = new FileOutputStream(outputFile)
def onNext(chunk: Array[Byte]): Ack = {
try {
out.write(chunk)
Continue
} catch {
case NonFatal(ex) =>
try out.close() catch { case NonFatal(_) => /* ignore */ }
callback.onError(ex)
Stop
}
}
def onError(ex: Throwable): Unit = {
try out.close() catch { case NonFatal(_) => /* ignore */ }
callback.onError(ex)
}
def onComplete(): Unit = {
try {
out.close()
callback.onSuccess(())
} catch {
case NonFatal(ex) =>
callback.onError(ex)
}
}
}
}
// Invocation ...
import concurrent.Await
import concurrent.duration.Duration
Await.result(copyFile(input, output).runAsync, Duration.Inf)
@alexandru
Copy link
Author

Made some changes, due to an initial mistake - which is what happens when I write code that I don't run :-)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment