Skip to content

Instantly share code, notes, and snippets.

@kirked
Last active March 6, 2020 21:28
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kirked/03c7f111de0e9a1f74377bf95d3f0f60 to your computer and use it in GitHub Desktop.
Save kirked/03c7f111de0e9a1f74377bf95d3f0f60 to your computer and use it in GitHub Desktop.
Akka streams implementation of constant-space (per stream) zip file creation.
/*------------------------------------------------------------------------------
* MIT License
*
* Copyright (c) 2017 Doug Kirk
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*----------------------------------------------------------------------------*/
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler, StageLogging}
import akka.util.{ByteString, ByteStringBuilder}
import java.io.{InputStream, OutputStream}
import scala.util.control.NonFatal
/**
* Streamed zip-file creation Akka streams implementation.
* Typical usage, assuming an implicit ActorSystem:
*
* import akka.stream._
* import akka.stream.scaladsl._
*
* val filesToZip: Iterable[ZipToStreamFlow.ZipSource] = ...
* Source(filesToZip)
* .via(ZipToStreamFlow(8192))
* .runWith(someSink)(ActorMaterializer())
*
* The ActorMaterializer can also be implicit.
*/
final class ZipToStreamFlow(bufferSize: Int) extends GraphStage[FlowShape[ZipToStreamFlow.ZipSource, ByteString]] {
import ZipToStreamFlow._
val in: Inlet[ZipSource] = Inlet("ZipToStreamFlow.in")
val out: Outlet[ByteString] = Outlet("ZipToStreamFlow.out")
override val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with StageLogging {
private val buffer = new ZipBuffer(bufferSize)
private var currentStream: Option[InputStream] = None
private var emptyStream = true
setHandler(
out,
new OutHandler {
override def onPull(): Unit =
if (isClosed(in)) {
if (buffer.isEmpty) {
completeStage()
} else {
buffer.close
push(out, buffer.toByteString)
}
} else pull(in)
override def onDownstreamFinish(): Unit = {
closeInput()
buffer.close
super.onDownstreamFinish()
}
}
)
setHandler(
in,
new InHandler {
override def onPush(): Unit = {
val zipSource = grab(in)
emptyStream = false
buffer.startEntry(zipSource.filePath)
val stream = zipSource.streamGenerator()
currentStream = Some(stream)
emitMultiple(out, fileChunks(stream, buffer), () => {
buffer.endEntry()
closeInput()
})
}
override def onUpstreamFinish(): Unit =
if (!buffer.isEmpty) {
buffer.close()
if (isAvailable(out)) {
push(out, buffer.toByteString)
}
} else if (emptyStream) super.onUpstreamFinish()
}
)
private def closeInput(): Unit = {
currentStream.foreach(_.close)
currentStream = None
}
private def fileChunks(stream: InputStream, buffer: ZipBuffer): Iterator[ByteString] = {
// This seems like a good trade-off between single-byte
// read I/O performance and doubling the ZipBuffer size.
//
// And it's still a decent defense against DDOS resource
// limit attacks.
val readBuffer = new Array[Byte](1024)
var done = false
def result: Stream[ByteString] =
if (done) Stream.empty
else {
try {
while (!done && buffer.remaining > 0) {
val bytesToRead = Math.min(readBuffer.length, buffer.remaining)
val count = stream.read(readBuffer, 0, bytesToRead)
if (count == -1) {
stream.close
done = true
} else buffer.write(readBuffer, count)
}
buffer.toByteString #:: result
} catch {
case NonFatal(e) =>
closeInput()
throw e
}
}
result.iterator.filter(_.nonEmpty)
}
}
}
object ZipToStreamFlow {
final case class ZipSource(filePath: String, streamGenerator: () => InputStream)
def apply(bufferSize: Int = 64 * 1024) = new ZipToStreamFlow(bufferSize)
private[this] trait SetZipStream {
def setOut(out: OutputStream): Unit
}
private[ZipToStreamFlow] final class ZipBuffer(val bufferSize: Int) {
import java.util.zip.{ZipEntry, ZipOutputStream}
private var builder = new ByteStringBuilder()
private val zip = new ZipOutputStream(builder.asOutputStream) with SetZipStream {
// this MUST ONLY be used after flush()!
override def setOut(newStream: OutputStream): Unit = out = newStream
}
private var inEntry = false
private var closed = false
def close(): Unit = {
endEntry()
closed = true
zip.close()
}
def remaining(): Int = bufferSize - builder.length
def isEmpty(): Boolean = builder.isEmpty
def nonEmpty(): Boolean = !isEmpty
def startEntry(path: String): Unit =
if (!closed) {
endEntry()
zip.putNextEntry(new ZipEntry(path))
inEntry = true
}
def endEntry(): Unit =
if (!closed && inEntry) {
inEntry = false
zip.closeEntry()
}
def write(byte: Int): Unit =
if (!closed && inEntry) zip.write(byte)
def write(bytes: Array[Byte], length: Int): Unit =
if (!closed && inEntry) zip.write(bytes, 0, length)
def toByteString(): ByteString = {
zip.flush()
val result = builder.result
builder = new ByteStringBuilder()
// set the underlying output for the zip stream to be the buffer
// directly, so we don't have to copy the zip'd byte array.
zip.setOut(builder.asOutputStream)
result
}
}
}
@unkarjedy
Copy link

+1 to @hejfelix

line 75: completeStage()
Should be removed.
If fileChunks returns many large chunks the final buffer.endEntry
in line70: emitMultiple(out, fileChunks(stream, buffer), () => buffer.endEntry())
can be called after override def onUpstreamFinish() so the last zip entry will be empty.

Also line74 closeInput() should be moved to line70 callback andThen.

@unkarjedy
Copy link

I would like to point out that this code
line124 result.iterator should be replaced by this result.iterator

In some really rare cases, there can be empty ByteString in the result.
This empty ByteString causes PlayFramework response to die because Http response chunks cant be of zero length.
After 2 days of debugging, I did not manage to detect the exact root of the issue and even its circumstances.
But the bug is likely to observe on large zip entries (~4Mb).
I suppose it is somehow related to the JVM bug https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8133170.

@magnusart
Copy link

Hi

Thanks for the example, I'd really like to reuse this code, but it turns out that /* License: MIT */ is "not good enough". There must either be an reference to a license file or the full license with your name as copyright holder (note: IANAL). I know this is an inconvenience: But would it be possible to add a full License clause with your name as copyright holder?

The license can be found here: https://simple.wikipedia.org/wiki/MIT_License

@kirked
Copy link
Author

kirked commented Mar 30, 2018

Wow, how am I not getting notices for all this conversation!

@kirked
Copy link
Author

kirked commented Mar 30, 2018

I've incorporated the changes noted here; unfortunately this was an exploratory effort and I never had a chance to put it into something that would test it well. We were, however, using the Play framework version in production. The code for this one was derived from the other, which uses Play's iteratees.

@kirked
Copy link
Author

kirked commented Mar 30, 2018

@kirked
Copy link
Author

kirked commented Mar 30, 2018

@cozyss also see the above link, which is the Play framework's iteratee-based version. It is being used in several production apps.

@kirked
Copy link
Author

kirked commented Mar 30, 2018

@magnusart done

@s4s0l
Copy link

s4s0l commented Oct 29, 2018

There is one more problem - when upstream finishes without producing any elements it hangs forever, my fix:

override def onUpstreamFinish(): Unit = {
        if (!buffer.isEmpty) {
          buffer.close()
          if (isAvailable(out)) {
            push(out, buffer.toByteString)
          }
        }else {
          super.onUpstreamFinish()
        }
      }

@kirked
Copy link
Author

kirked commented Feb 27, 2019

@s4s0l thanks, incorporated!

@michalbogacz
Copy link

Adding super.onUpstreamFinish() in onUpstreamFinish created problem.
When a file is big, super.onUpstreamFinish() can be called event if downstream didn't consume all fileChunks.
This will result in not closing zip.
One of the workarounds is creating boolean emptyStream and running super.onUpstreamFinish() only when the stream was empty.
if(emptyStream) { super.onUpstreamFinish() }

@kirked It's a very useful stage, did you consider adding it to akka-stream-contrib ?

@mkurz
Copy link

mkurz commented Jul 23, 2019

@unkarjedy referring your comment:

I would like to point out that this code
line124 result.iterator should be replaced by this result.iterator

Replace result.iterator with what? Can you maybe post your whole file? Thanks!

@mkurz
Copy link

mkurz commented Jul 23, 2019

@michalbogacz Can you please post the changes you mentioned in your comment?

@michalbogacz
Copy link

michalbogacz commented Jul 25, 2019

@mkurz Just apply changes I wrote about.

Or if you need a solution where you don't have InputStream in input source, I created gist where input stream is Source of Source of ByteString URL. It composes better with Alpakka API.

@mkurz
Copy link

mkurz commented Jul 25, 2019

@michalbogacz I did here: mkurz/streamed-zip@9d3fa3c
Can you have a look please? Is this correct? I was unsure where to set emptyStream = true, should it be in the if (buffer.isEmpty) { block or at the end of the outer if(isClosed(in)){...} block?
Thank you very much!

@michalbogacz
Copy link

@mkurz it looks ok, although I didn't have these changes from lines 71-76. If you want to be sure of your implementation, add tests :)

@kirked
Copy link
Author

kirked commented Jul 25, 2019

If anybody would like to submit this to akka/akka-stream-contrib, you may be able to get it done before I have a chance to, and I'm fine with doing that!

@mkurz
Copy link

mkurz commented Jul 25, 2019

@michalbogacz I just was unsure if this is correct:

if (isClosed(in)) {
  if (buffer.isEmpty) {
    emptyStream = true
    completeStage()
  } else {
    buffer.close
    push(out, buffer.toByteString)
  }
}

or this:

if (isClosed(in)) {
  emptyStream = true
  if (buffer.isEmpty) {
    completeStage()
  } else {
    buffer.close
    push(out, buffer.toByteString)
  }
  // or here: emptyStream = true?
}

Do you think it makes a difference?

@kirked
Copy link
Author

kirked commented Jul 25, 2019

I don't think you'd want to set emptyStream true` unless the buffer is empty, no?

I've updated the gist with that change, and if there are further changes I'll be monitoring. Github finally has implemented notification emails for gists, so at least now I can see that there's activity.

I'll also be contacting @raboof about getting this, or a variant of it, added to akka-stream-config. Since they've already got a ZipInputStreamSource, I've renamed this ZipToStream (perhaps it should even be ZipToStreamFlow).

@michalbogacz
Copy link

@mkurz I didn't use it in onPull. This flag was only for case where you had empty stream in the beginning.

Later I created my own solution URL that was using Source[ByteString] instead of InputStream (I wanted to download zip with many files from S3 using Alpakka, so using InputStream only complicated my code). I can too contribute my code to akka-stream-contrib if there will be interest from @raboof or other project members.

@kirked
Copy link
Author

kirked commented Jul 25, 2019

There. PR at akka/akka-stream-contrib#170 . I agree that using Alpakka today would probably be best; it wasn't around when this originated, only the AWS Java SDK was available!

Perhaps something good will come of it, and it can be supported.

@mkurz
Copy link

mkurz commented Jul 25, 2019

@kirked You define private var emptyStream = false? Is this correct? That means the stream is not empty from the beginning? (In my code I initialized it with true). I just want to make sure you or I did not make a mistake 😉

And about the buffer:

} else {
  buffer.close
  push(out, buffer.toByteString)
}

So in this else we are closing the buffer and push its remaining content out - so isn't, after these two lines, the buffer empty as well? And therefore could set emptyStream to true as well? (However if afterwards the onPull will be called once more, than your current code is correct because then we would end up in the if (buffer.isEmpty) { branch in that subsequent invocation anyway.

(I am not much into this code so probably you a right anyway, I just want to have kind of confirmation 😉)

@kirked
Copy link
Author

kirked commented Jul 25, 2019

Yes, just fixed, thanks!

@mkurz
Copy link

mkurz commented Jul 25, 2019

@kirked Just one last thing: In the above comment the author writes

In some really rare cases, there can be empty ByteString in the result.
This empty ByteString causes PlayFramework response to die because Http response chunks cant be of zero length.

and suggests to replace result.iterator... I guess he meant to replace it with something like result.iterator.filter(!_.isEmpty) to ignore empty elements? Because they are empty they shouldn't matter anyway?

Does this make sense? What do you think?

@mkurz
Copy link

mkurz commented Jul 25, 2019

@michalbogacz Yes, please contribute your code to akka-stream-contrib as well. I am interested 😉 Why not open a pull request?

@kirked
Copy link
Author

kirked commented Jul 26, 2019

I've added the filter(_.nonEmpty) to the stream iterator.

@mkurz
Copy link

mkurz commented Jul 28, 2019

@kirked Thanks! Can you add it to your pull request as well?

@mkurz
Copy link

mkurz commented Jul 29, 2019

@michalbogacz I saw you opened akka/akka-stream-contrib#171, thanks a lot!

@kirked
Copy link
Author

kirked commented Aug 5, 2019

@mkurz, I added the filter to the PR.

Good news, @raboof has approved the PR, so this will be joining akka-stream-contrib.

@mkurz
Copy link

mkurz commented Aug 5, 2019

@kirked Thanks! I am aware, I am following that pull request too 😉 Thanks for your great work!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment