Skip to content

Instantly share code, notes, and snippets.

@markus1189
Created July 3, 2019 08:10
Show Gist options
  • Save markus1189/f4df8020df8527d05ead001e7e1cb598 to your computer and use it in GitHub Desktop.
Save markus1189/f4df8020df8527d05ead001e7e1cb598 to your computer and use it in GitHub Desktop.
Streaming creation of a zip archive with akka-streams
import java.io.ByteArrayOutputStream
import java.nio.charset.StandardCharsets
import java.nio.file.{Path, Paths}
import java.util.zip.{ZipEntry, ZipOutputStream}
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, IOResult}
import akka.stream.scaladsl.{FileIO, Flow, Keep, Source}
import akka.util.ByteString
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
import scala.collection.immutable.Iterable
sealed trait ZipInstruction extends Product with Serializable
case class StartOfFile(name: String) extends ZipInstruction
case class ZipFileContent(name: String, content: ByteString)
extends ZipInstruction
case class EndOfFile(name: String) extends ZipInstruction
case object EndOfZip extends ZipInstruction
object ZipArchiveStream {
val streamZipArchive: Flow[ZipInstruction, ByteString, NotUsed] =
Flow[ZipInstruction].statefulMapConcat { () =>
val baos = new ByteArrayOutputStream
val zipos = new ZipOutputStream(baos, StandardCharsets.UTF_8)
def emit(): Iterable[ByteString] = {
val output = baos.toByteArray
baos.reset()
Vector(ByteString(output))
}
def handleInstruction(
instruction: ZipInstruction): Iterable[ByteString] = {
instruction match {
case StartOfFile(name) => zipos.putNextEntry(new ZipEntry(name))
case ZipFileContent(name, content) => zipos.write(content.toArray)
case EndOfFile(name) => zipos.closeEntry()
case EndOfZip => zipos.finish()
}
emit()
}
handleInstruction
}
def fromFiles(
filepath: Path,
filepaths: Path*): Source[ByteString, List[Future[IOResult]]] = {
def fromPath(p: Path): Source[ZipInstruction, Future[IOResult]] = {
Source
.single(StartOfFile(p.getFileName.toString))
.concatMat(FileIO
.fromPath(p)
.map(ZipFileContent(p.getFileName.toString, _))
.concat(Source.single(EndOfFile(p.getFileName.toString))))(Keep.right)
}
filepaths
.foldLeft(fromPath(filepath).mapMaterializedValue(List(_))) {
(src, path) =>
src.concatMat(fromPath(path))(Keep.both).mapMaterializedValue {
case (fs, f) => fs :+ f
}
}
.concat(Source.single(EndOfZip))
.via(streamZipArchive)
}
}
object Main extends App {
implicit val actorSystem: ActorSystem = ActorSystem()
try {
implicit val actorMaterializer: ActorMaterializer = ActorMaterializer()
val src = ZipArchiveStream
.fromFiles(
Paths.get("<file1>"),
Paths.get("<file2>"),
Paths.get("<file3>")
)
val result = src
.toMat(FileIO.toPath(Paths.get("/tmp/archive.zip")))(
Keep.right)
.run()
val ioResult = Await.result(result, Duration.Inf)
println(ioResult)
} finally {
actorSystem.terminate()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment