Created
July 3, 2019 08:10
-
-
Save markus1189/f4df8020df8527d05ead001e7e1cb598 to your computer and use it in GitHub Desktop.
Streaming creation of a zip archive with akka-streams
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.ByteArrayOutputStream | |
import java.nio.charset.StandardCharsets | |
import java.nio.file.{Path, Paths} | |
import java.util.zip.{ZipEntry, ZipOutputStream} | |
import akka.NotUsed | |
import akka.actor.ActorSystem | |
import akka.stream.{ActorMaterializer, IOResult} | |
import akka.stream.scaladsl.{FileIO, Flow, Keep, Source} | |
import akka.util.ByteString | |
import scala.concurrent.{Await, Future} | |
import scala.concurrent.duration.Duration | |
import scala.collection.immutable.Iterable | |
sealed trait ZipInstruction extends Product with Serializable | |
case class StartOfFile(name: String) extends ZipInstruction | |
case class ZipFileContent(name: String, content: ByteString) | |
extends ZipInstruction | |
case class EndOfFile(name: String) extends ZipInstruction | |
case object EndOfZip extends ZipInstruction | |
object ZipArchiveStream { | |
val streamZipArchive: Flow[ZipInstruction, ByteString, NotUsed] = | |
Flow[ZipInstruction].statefulMapConcat { () => | |
val baos = new ByteArrayOutputStream | |
val zipos = new ZipOutputStream(baos, StandardCharsets.UTF_8) | |
def emit(): Iterable[ByteString] = { | |
val output = baos.toByteArray | |
baos.reset() | |
Vector(ByteString(output)) | |
} | |
def handleInstruction( | |
instruction: ZipInstruction): Iterable[ByteString] = { | |
instruction match { | |
case StartOfFile(name) => zipos.putNextEntry(new ZipEntry(name)) | |
case ZipFileContent(name, content) => zipos.write(content.toArray) | |
case EndOfFile(name) => zipos.closeEntry() | |
case EndOfZip => zipos.finish() | |
} | |
emit() | |
} | |
handleInstruction | |
} | |
def fromFiles( | |
filepath: Path, | |
filepaths: Path*): Source[ByteString, List[Future[IOResult]]] = { | |
def fromPath(p: Path): Source[ZipInstruction, Future[IOResult]] = { | |
Source | |
.single(StartOfFile(p.getFileName.toString)) | |
.concatMat(FileIO | |
.fromPath(p) | |
.map(ZipFileContent(p.getFileName.toString, _)) | |
.concat(Source.single(EndOfFile(p.getFileName.toString))))(Keep.right) | |
} | |
filepaths | |
.foldLeft(fromPath(filepath).mapMaterializedValue(List(_))) { | |
(src, path) => | |
src.concatMat(fromPath(path))(Keep.both).mapMaterializedValue { | |
case (fs, f) => fs :+ f | |
} | |
} | |
.concat(Source.single(EndOfZip)) | |
.via(streamZipArchive) | |
} | |
} | |
object Main extends App { | |
implicit val actorSystem: ActorSystem = ActorSystem() | |
try { | |
implicit val actorMaterializer: ActorMaterializer = ActorMaterializer() | |
val src = ZipArchiveStream | |
.fromFiles( | |
Paths.get("<file1>"), | |
Paths.get("<file2>"), | |
Paths.get("<file3>") | |
) | |
val result = src | |
.toMat(FileIO.toPath(Paths.get("/tmp/archive.zip")))( | |
Keep.right) | |
.run() | |
val ioResult = Await.result(result, Duration.Inf) | |
println(ioResult) | |
} finally { | |
actorSystem.terminate() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment