Skip to content

Instantly share code, notes, and snippets.

@jbgi
Last active December 16, 2016 09:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jbgi/8ffcea4e1fd0780e5557cf5800bd33ad to your computer and use it in GitHub Desktop.
Save jbgi/8ffcea4e1fd0780e5557cf5800bd33ad to your computer and use it in GitHub Desktop.
"Atomic" file sink through temporary file sink + renaming
import java.nio.file._
import java.util.concurrent.Executor
import akka.Done
import akka.stream.IOResult
import akka.stream.scaladsl.{FileIO, Sink}
import akka.util.ByteString
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try
import scala.util.control.NonFatal
sealed trait FileCreationOption
case object FailOnExist extends FileCreationOption
case object ReplaceExisting extends FileCreationOption
object FileSinks {
/**
* File sink that operate through a temporary file ("destinationFile.tmp")
* so that only complete "destinationFile" files are observable on file system.
* IOResult will contains an exception on errors including, by default, if the destination file already exist.
*/
def toCompleteFile(destinationFile: Path, option: FileCreationOption = FailOnExist): Sink[ByteString, Future[IOResult]] = {
// Lazy factory is used to be able to use a different temporary file at each run
// (otherwise processes might be stuck due to residual tmp file):
Sink.lazyInit[ByteString, Future[IOResult]](_ => {
val fileSink = if (option == FailOnExist && Files.exists(destinationFile))
failedSink(new FileAlreadyExistsException(destinationFile.toString))
else try {
val tmpDestFile = Files.createTempFile(destinationFile.getParent, s"${destinationFile.getFileName.toString}.", ".tmp")
FileIO.toPath(tmpDestFile).mapMaterializedValue(_.map(writeResult => {
val moveResult = if (writeResult.wasSuccessful)
IOResult(writeResult.count, Try({
option match {
case FailOnExist => Files.move(tmpDestFile, destinationFile)
case ReplaceExisting => Files.move(tmpDestFile, destinationFile, StandardCopyOption.REPLACE_EXISTING)
}
Done
}))
else writeResult
Try(Files.deleteIfExists(tmpDestFile)) // Best effort
moveResult
})(FileSinks.sameThreadExecutionContext))
} catch {
//case were temporary file creation failed
case NonFatal(e) => failedSink(e)
}
Future.successful(fileSink)
}, () => Future.successful(IOResult.createSuccessful(0)))
.mapMaterializedValue(_.flatMap(identity)(FileSinks.sameThreadExecutionContext))
}
private def failedSink(exception: Throwable) = Sink.cancelled[ByteString]
.mapMaterializedValue(_ => Future.successful(IOResult.createFailed(0, exception)))
private val sameThreadExecutionContext = ExecutionContext.fromExecutor(new Executor() {
override def execute(command: Runnable): Unit = command.run()
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment