Skip to content

Instantly share code, notes, and snippets.

@kirked
Last active March 6, 2020 21:28
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kirked/03c7f111de0e9a1f74377bf95d3f0f60 to your computer and use it in GitHub Desktop.
Save kirked/03c7f111de0e9a1f74377bf95d3f0f60 to your computer and use it in GitHub Desktop.
Akka streams implementation of constant-space (per stream) zip file creation.
/*------------------------------------------------------------------------------
* MIT License
*
* Copyright (c) 2017 Doug Kirk
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*----------------------------------------------------------------------------*/
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler, StageLogging}
import akka.util.{ByteString, ByteStringBuilder}
import java.io.{InputStream, OutputStream}
import scala.util.control.NonFatal
/**
* Streamed zip-file creation Akka streams implementation.
* Typical usage, assuming an implicit ActorSystem:
*
* import akka.stream._
* import akka.stream.scaladsl._
*
* val filesToZip: Iterable[ZipToStreamFlow.ZipSource] = ...
* Source(filesToZip)
* .via(ZipToStreamFlow(8192))
* .runWith(someSink)(ActorMaterializer())
*
* The ActorMaterializer can also be implicit.
*/
final class ZipToStreamFlow(bufferSize: Int) extends GraphStage[FlowShape[ZipToStreamFlow.ZipSource, ByteString]] {
import ZipToStreamFlow._
val in: Inlet[ZipSource] = Inlet("ZipToStreamFlow.in")
val out: Outlet[ByteString] = Outlet("ZipToStreamFlow.out")
override val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with StageLogging {
private val buffer = new ZipBuffer(bufferSize)
private var currentStream: Option[InputStream] = None
private var emptyStream = true
setHandler(
out,
new OutHandler {
override def onPull(): Unit =
if (isClosed(in)) {
if (buffer.isEmpty) {
completeStage()
} else {
buffer.close
push(out, buffer.toByteString)
}
} else pull(in)
override def onDownstreamFinish(): Unit = {
closeInput()
buffer.close
super.onDownstreamFinish()
}
}
)
setHandler(
in,
new InHandler {
override def onPush(): Unit = {
val zipSource = grab(in)
emptyStream = false
buffer.startEntry(zipSource.filePath)
val stream = zipSource.streamGenerator()
currentStream = Some(stream)
emitMultiple(out, fileChunks(stream, buffer), () => {
buffer.endEntry()
closeInput()
})
}
override def onUpstreamFinish(): Unit =
if (!buffer.isEmpty) {
buffer.close()
if (isAvailable(out)) {
push(out, buffer.toByteString)
}
} else if (emptyStream) super.onUpstreamFinish()
}
)
private def closeInput(): Unit = {
currentStream.foreach(_.close)
currentStream = None
}
private def fileChunks(stream: InputStream, buffer: ZipBuffer): Iterator[ByteString] = {
// This seems like a good trade-off between single-byte
// read I/O performance and doubling the ZipBuffer size.
//
// And it's still a decent defense against DDOS resource
// limit attacks.
val readBuffer = new Array[Byte](1024)
var done = false
def result: Stream[ByteString] =
if (done) Stream.empty
else {
try {
while (!done && buffer.remaining > 0) {
val bytesToRead = Math.min(readBuffer.length, buffer.remaining)
val count = stream.read(readBuffer, 0, bytesToRead)
if (count == -1) {
stream.close
done = true
} else buffer.write(readBuffer, count)
}
buffer.toByteString #:: result
} catch {
case NonFatal(e) =>
closeInput()
throw e
}
}
result.iterator.filter(_.nonEmpty)
}
}
}
object ZipToStreamFlow {
final case class ZipSource(filePath: String, streamGenerator: () => InputStream)
def apply(bufferSize: Int = 64 * 1024) = new ZipToStreamFlow(bufferSize)
private[this] trait SetZipStream {
def setOut(out: OutputStream): Unit
}
private[ZipToStreamFlow] final class ZipBuffer(val bufferSize: Int) {
import java.util.zip.{ZipEntry, ZipOutputStream}
private var builder = new ByteStringBuilder()
private val zip = new ZipOutputStream(builder.asOutputStream) with SetZipStream {
// this MUST ONLY be used after flush()!
override def setOut(newStream: OutputStream): Unit = out = newStream
}
private var inEntry = false
private var closed = false
def close(): Unit = {
endEntry()
closed = true
zip.close()
}
def remaining(): Int = bufferSize - builder.length
def isEmpty(): Boolean = builder.isEmpty
def nonEmpty(): Boolean = !isEmpty
def startEntry(path: String): Unit =
if (!closed) {
endEntry()
zip.putNextEntry(new ZipEntry(path))
inEntry = true
}
def endEntry(): Unit =
if (!closed && inEntry) {
inEntry = false
zip.closeEntry()
}
def write(byte: Int): Unit =
if (!closed && inEntry) zip.write(byte)
def write(bytes: Array[Byte], length: Int): Unit =
if (!closed && inEntry) zip.write(bytes, 0, length)
def toByteString(): ByteString = {
zip.flush()
val result = builder.result
builder = new ByteStringBuilder()
// set the underlying output for the zip stream to be the buffer
// directly, so we don't have to copy the zip'd byte array.
zip.setOut(builder.asOutputStream)
result
}
}
}
@kirked
Copy link
Author

kirked commented Jul 26, 2019

I've added the filter(_.nonEmpty) to the stream iterator.

@mkurz
Copy link

mkurz commented Jul 28, 2019

@kirked Thanks! Can you add it to your pull request as well?

@mkurz
Copy link

mkurz commented Jul 29, 2019

@michalbogacz I saw you opened akka/akka-stream-contrib#171, thanks a lot!

@kirked
Copy link
Author

kirked commented Aug 5, 2019

@mkurz, I added the filter to the PR.

Good news, @raboof has approved the PR, so this will be joining akka-stream-contrib.

@mkurz
Copy link

mkurz commented Aug 5, 2019

@kirked Thanks! I am aware, I am following that pull request too 😉 Thanks for your great work!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment