Skip to content

Instantly share code, notes, and snippets.

@michalbogacz
Last active August 28, 2019 07:08
Show Gist options
  • Save michalbogacz/08868f51b7053cadfef00f51196bf2ec to your computer and use it in GitHub Desktop.
Save michalbogacz/08868f51b7053cadfef00f51196bf2ec to your computer and use it in GitHub Desktop.
Creating zip file with Akka Streams.
/*------------------------------------------------------------------------------
* MIT License
*
* Copyright (c) 2019 Michał Bogacz
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*----------------------------------------------------------------------------*/
import java.util.zip.{ZipEntry, ZipOutputStream}
import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl.{Flow, Source}
import akka.stream.stage._
import akka.util.{ByteString, ByteStringBuilder}
object FileZipStream {
private val startFileWord = "$START$"
private val endFileWord = "$END$"
private val separator: Char = '|'
private val zipStage = new FileZipStage()
def zip(): Flow[(String, Source[ByteString, Any]), ByteString, NotUsed] = {
Flow[(String, Source[ByteString, Any])]
.flatMapConcat { case (path, stream) =>
val prependElem: Source[ByteString, Any] = Source.single(createStartingByteString(path))
val appendElem: Source[ByteString, Any] = Source.single(ByteString(endFileWord))
stream.prepend(prependElem).concat(appendElem).via(zipStage)
}
}
private def createStartingByteString(path: String): ByteString = {
ByteString(s"$startFileWord$separator$path")
}
private def getPathFromByteString(b: ByteString): String = {
val splitted = b.utf8String.split(separator)
if(splitted.length == 1){
""
} else if (splitted.length == 2){
splitted.tail.head
} else {
splitted.tail.mkString(separator.toString)
}
}
class FileZipStage extends GraphStage[FlowShape[ByteString, ByteString]] {
val in: Inlet[ByteString] = Inlet("FileZipStage.in")
val out: Outlet[ByteString] = Outlet("FileZipStage.out")
override val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private val builder = new ByteStringBuilder()
private val zip = new ZipOutputStream(builder.asOutputStream)
private var emptyStream = true
setHandler(
out,
new OutHandler {
override def onPull(): Unit = {
if (isClosed(in)) {
zip.close()
val result = builder.result
if (result.nonEmpty) {
push(out, result)
}
builder.clear()
emptyStream = true
completeStage()
}else{
pull(in)
}
}
}
)
setHandler(
in,
new InHandler {
override def onPush(): Unit = {
emptyStream = false
val element = grab(in)
element match {
case b: ByteString if b.utf8String.startsWith(FileZipStream.startFileWord) =>
val name = FileZipStream.getPathFromByteString(b)
zip.putNextEntry(new ZipEntry(name))
case b: ByteString if b.utf8String == FileZipStream.endFileWord =>
zip.closeEntry()
case b: ByteString =>
val array = b.toArray
zip.write(array, 0, array.length)
}
zip.flush()
val result = builder.result
if (result.nonEmpty) {
builder.clear()
push(out, result)
}else{
pull(in)
}
}
override def onUpstreamFinish(): Unit = {
if (emptyStream) {
super.onUpstreamFinish()
}
}
}
)
}
}
}
@mkurz
Copy link

mkurz commented Aug 28, 2019

@michalbogacz Awesome, thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment