Skip to content

Instantly share code, notes, and snippets.

@kirked
Last active March 6, 2020 21:28
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kirked/03c7f111de0e9a1f74377bf95d3f0f60 to your computer and use it in GitHub Desktop.
Save kirked/03c7f111de0e9a1f74377bf95d3f0f60 to your computer and use it in GitHub Desktop.
Akka streams implementation of constant-space (per stream) zip file creation.
/*------------------------------------------------------------------------------
* MIT License
*
* Copyright (c) 2017 Doug Kirk
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*----------------------------------------------------------------------------*/
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler, StageLogging}
import akka.util.{ByteString, ByteStringBuilder}
import java.io.{InputStream, OutputStream}
import scala.util.control.NonFatal
/**
* Streamed zip-file creation Akka streams implementation.
* Typical usage, assuming an implicit ActorSystem:
*
* import akka.stream._
* import akka.stream.scaladsl._
*
* val filesToZip: Iterable[ZipToStreamFlow.ZipSource] = ...
* Source(filesToZip)
* .via(ZipToStreamFlow(8192))
* .runWith(someSink)(ActorMaterializer())
*
* The ActorMaterializer can also be implicit.
*/
final class ZipToStreamFlow(bufferSize: Int) extends GraphStage[FlowShape[ZipToStreamFlow.ZipSource, ByteString]] {
import ZipToStreamFlow._
val in: Inlet[ZipSource] = Inlet("ZipToStreamFlow.in")
val out: Outlet[ByteString] = Outlet("ZipToStreamFlow.out")
override val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with StageLogging {
private val buffer = new ZipBuffer(bufferSize)
private var currentStream: Option[InputStream] = None
private var emptyStream = true
setHandler(
out,
new OutHandler {
override def onPull(): Unit =
if (isClosed(in)) {
if (buffer.isEmpty) {
completeStage()
} else {
buffer.close
push(out, buffer.toByteString)
}
} else pull(in)
override def onDownstreamFinish(): Unit = {
closeInput()
buffer.close
super.onDownstreamFinish()
}
}
)
setHandler(
in,
new InHandler {
override def onPush(): Unit = {
val zipSource = grab(in)
emptyStream = false
buffer.startEntry(zipSource.filePath)
val stream = zipSource.streamGenerator()
currentStream = Some(stream)
emitMultiple(out, fileChunks(stream, buffer), () => {
buffer.endEntry()
closeInput()
})
}
override def onUpstreamFinish(): Unit =
if (!buffer.isEmpty) {
buffer.close()
if (isAvailable(out)) {
push(out, buffer.toByteString)
}
} else if (emptyStream) super.onUpstreamFinish()
}
)
private def closeInput(): Unit = {
currentStream.foreach(_.close)
currentStream = None
}
private def fileChunks(stream: InputStream, buffer: ZipBuffer): Iterator[ByteString] = {
// This seems like a good trade-off between single-byte
// read I/O performance and doubling the ZipBuffer size.
//
// And it's still a decent defense against DDOS resource
// limit attacks.
val readBuffer = new Array[Byte](1024)
var done = false
def result: Stream[ByteString] =
if (done) Stream.empty
else {
try {
while (!done && buffer.remaining > 0) {
val bytesToRead = Math.min(readBuffer.length, buffer.remaining)
val count = stream.read(readBuffer, 0, bytesToRead)
if (count == -1) {
stream.close
done = true
} else buffer.write(readBuffer, count)
}
buffer.toByteString #:: result
} catch {
case NonFatal(e) =>
closeInput()
throw e
}
}
result.iterator.filter(_.nonEmpty)
}
}
}
object ZipToStreamFlow {
final case class ZipSource(filePath: String, streamGenerator: () => InputStream)
def apply(bufferSize: Int = 64 * 1024) = new ZipToStreamFlow(bufferSize)
private[this] trait SetZipStream {
def setOut(out: OutputStream): Unit
}
private[ZipToStreamFlow] final class ZipBuffer(val bufferSize: Int) {
import java.util.zip.{ZipEntry, ZipOutputStream}
private var builder = new ByteStringBuilder()
private val zip = new ZipOutputStream(builder.asOutputStream) with SetZipStream {
// this MUST ONLY be used after flush()!
override def setOut(newStream: OutputStream): Unit = out = newStream
}
private var inEntry = false
private var closed = false
def close(): Unit = {
endEntry()
closed = true
zip.close()
}
def remaining(): Int = bufferSize - builder.length
def isEmpty(): Boolean = builder.isEmpty
def nonEmpty(): Boolean = !isEmpty
def startEntry(path: String): Unit =
if (!closed) {
endEntry()
zip.putNextEntry(new ZipEntry(path))
inEntry = true
}
def endEntry(): Unit =
if (!closed && inEntry) {
inEntry = false
zip.closeEntry()
}
def write(byte: Int): Unit =
if (!closed && inEntry) zip.write(byte)
def write(bytes: Array[Byte], length: Int): Unit =
if (!closed && inEntry) zip.write(bytes, 0, length)
def toByteString(): ByteString = {
zip.flush()
val result = builder.result
builder = new ByteStringBuilder()
// set the underlying output for the zip stream to be the buffer
// directly, so we don't have to copy the zip'd byte array.
zip.setOut(builder.asOutputStream)
result
}
}
}
@mkurz
Copy link

mkurz commented Jul 25, 2019

@kirked You define private var emptyStream = false? Is this correct? That means the stream is not empty from the beginning? (In my code I initialized it with true). I just want to make sure you or I did not make a mistake 😉

And about the buffer:

} else {
  buffer.close
  push(out, buffer.toByteString)
}

So in this else we are closing the buffer and push its remaining content out - so isn't, after these two lines, the buffer empty as well? And therefore could set emptyStream to true as well? (However if afterwards the onPull will be called once more, than your current code is correct because then we would end up in the if (buffer.isEmpty) { branch in that subsequent invocation anyway.

(I am not much into this code so probably you a right anyway, I just want to have kind of confirmation 😉)

@kirked
Copy link
Author

kirked commented Jul 25, 2019

Yes, just fixed, thanks!

@mkurz
Copy link

mkurz commented Jul 25, 2019

@kirked Just one last thing: In the above comment the author writes

In some really rare cases, there can be empty ByteString in the result.
This empty ByteString causes PlayFramework response to die because Http response chunks cant be of zero length.

and suggests to replace result.iterator... I guess he meant to replace it with something like result.iterator.filter(!_.isEmpty) to ignore empty elements? Because they are empty they shouldn't matter anyway?

Does this make sense? What do you think?

@mkurz
Copy link

mkurz commented Jul 25, 2019

@michalbogacz Yes, please contribute your code to akka-stream-contrib as well. I am interested 😉 Why not open a pull request?

@kirked
Copy link
Author

kirked commented Jul 26, 2019

I've added the filter(_.nonEmpty) to the stream iterator.

@mkurz
Copy link

mkurz commented Jul 28, 2019

@kirked Thanks! Can you add it to your pull request as well?

@mkurz
Copy link

mkurz commented Jul 29, 2019

@michalbogacz I saw you opened akka/akka-stream-contrib#171, thanks a lot!

@kirked
Copy link
Author

kirked commented Aug 5, 2019

@mkurz, I added the filter to the PR.

Good news, @raboof has approved the PR, so this will be joining akka-stream-contrib.

@mkurz
Copy link

mkurz commented Aug 5, 2019

@kirked Thanks! I am aware, I am following that pull request too 😉 Thanks for your great work!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment