-
-
Save kirked/03c7f111de0e9a1f74377bf95d3f0f60 to your computer and use it in GitHub Desktop.
/*------------------------------------------------------------------------------ | |
* MIT License | |
* | |
* Copyright (c) 2017 Doug Kirk | |
* | |
* Permission is hereby granted, free of charge, to any person obtaining a copy | |
* of this software and associated documentation files (the "Software"), to deal | |
* in the Software without restriction, including without limitation the rights | |
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
* copies of the Software, and to permit persons to whom the Software is | |
* furnished to do so, subject to the following conditions: | |
* | |
* The above copyright notice and this permission notice shall be included in all | |
* copies or substantial portions of the Software. | |
* | |
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
* SOFTWARE. | |
*----------------------------------------------------------------------------*/ | |
import akka.stream._ | |
import akka.stream.scaladsl._ | |
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler, StageLogging} | |
import akka.util.{ByteString, ByteStringBuilder} | |
import java.io.{InputStream, OutputStream} | |
import scala.util.control.NonFatal | |
/** | |
* Streamed zip-file creation Akka streams implementation. | |
* Typical usage, assuming an implicit ActorSystem: | |
* | |
* import akka.stream._ | |
* import akka.stream.scaladsl._ | |
* | |
* val filesToZip: Iterable[ZipToStreamFlow.ZipSource] = ... | |
* Source(filesToZip) | |
* .via(ZipToStreamFlow(8192)) | |
* .runWith(someSink)(ActorMaterializer()) | |
* | |
* The ActorMaterializer can also be implicit. | |
*/ | |
final class ZipToStreamFlow(bufferSize: Int) extends GraphStage[FlowShape[ZipToStreamFlow.ZipSource, ByteString]] { | |
import ZipToStreamFlow._ | |
val in: Inlet[ZipSource] = Inlet("ZipToStreamFlow.in") | |
val out: Outlet[ByteString] = Outlet("ZipToStreamFlow.out") | |
override val shape = FlowShape.of(in, out) | |
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = | |
new GraphStageLogic(shape) with StageLogging { | |
private val buffer = new ZipBuffer(bufferSize) | |
private var currentStream: Option[InputStream] = None | |
private var emptyStream = true | |
setHandler( | |
out, | |
new OutHandler { | |
override def onPull(): Unit = | |
if (isClosed(in)) { | |
if (buffer.isEmpty) { | |
completeStage() | |
} else { | |
buffer.close | |
push(out, buffer.toByteString) | |
} | |
} else pull(in) | |
override def onDownstreamFinish(): Unit = { | |
closeInput() | |
buffer.close | |
super.onDownstreamFinish() | |
} | |
} | |
) | |
setHandler( | |
in, | |
new InHandler { | |
override def onPush(): Unit = { | |
val zipSource = grab(in) | |
emptyStream = false | |
buffer.startEntry(zipSource.filePath) | |
val stream = zipSource.streamGenerator() | |
currentStream = Some(stream) | |
emitMultiple(out, fileChunks(stream, buffer), () => { | |
buffer.endEntry() | |
closeInput() | |
}) | |
} | |
override def onUpstreamFinish(): Unit = | |
if (!buffer.isEmpty) { | |
buffer.close() | |
if (isAvailable(out)) { | |
push(out, buffer.toByteString) | |
} | |
} else if (emptyStream) super.onUpstreamFinish() | |
} | |
) | |
private def closeInput(): Unit = { | |
currentStream.foreach(_.close) | |
currentStream = None | |
} | |
private def fileChunks(stream: InputStream, buffer: ZipBuffer): Iterator[ByteString] = { | |
// This seems like a good trade-off between single-byte | |
// read I/O performance and doubling the ZipBuffer size. | |
// | |
// And it's still a decent defense against DDOS resource | |
// limit attacks. | |
val readBuffer = new Array[Byte](1024) | |
var done = false | |
def result: Stream[ByteString] = | |
if (done) Stream.empty | |
else { | |
try { | |
while (!done && buffer.remaining > 0) { | |
val bytesToRead = Math.min(readBuffer.length, buffer.remaining) | |
val count = stream.read(readBuffer, 0, bytesToRead) | |
if (count == -1) { | |
stream.close | |
done = true | |
} else buffer.write(readBuffer, count) | |
} | |
buffer.toByteString #:: result | |
} catch { | |
case NonFatal(e) => | |
closeInput() | |
throw e | |
} | |
} | |
result.iterator.filter(_.nonEmpty) | |
} | |
} | |
} | |
object ZipToStreamFlow { | |
final case class ZipSource(filePath: String, streamGenerator: () => InputStream) | |
def apply(bufferSize: Int = 64 * 1024) = new ZipToStreamFlow(bufferSize) | |
private[this] trait SetZipStream { | |
def setOut(out: OutputStream): Unit | |
} | |
private[ZipToStreamFlow] final class ZipBuffer(val bufferSize: Int) { | |
import java.util.zip.{ZipEntry, ZipOutputStream} | |
private var builder = new ByteStringBuilder() | |
private val zip = new ZipOutputStream(builder.asOutputStream) with SetZipStream { | |
// this MUST ONLY be used after flush()! | |
override def setOut(newStream: OutputStream): Unit = out = newStream | |
} | |
private var inEntry = false | |
private var closed = false | |
def close(): Unit = { | |
endEntry() | |
closed = true | |
zip.close() | |
} | |
def remaining(): Int = bufferSize - builder.length | |
def isEmpty(): Boolean = builder.isEmpty | |
def nonEmpty(): Boolean = !isEmpty | |
def startEntry(path: String): Unit = | |
if (!closed) { | |
endEntry() | |
zip.putNextEntry(new ZipEntry(path)) | |
inEntry = true | |
} | |
def endEntry(): Unit = | |
if (!closed && inEntry) { | |
inEntry = false | |
zip.closeEntry() | |
} | |
def write(byte: Int): Unit = | |
if (!closed && inEntry) zip.write(byte) | |
def write(bytes: Array[Byte], length: Int): Unit = | |
if (!closed && inEntry) zip.write(bytes, 0, length) | |
def toByteString(): ByteString = { | |
zip.flush() | |
val result = builder.result | |
builder = new ByteStringBuilder() | |
// set the underlying output for the zip stream to be the buffer | |
// directly, so we don't have to copy the zip'd byte array. | |
zip.setOut(builder.asOutputStream) | |
result | |
} | |
} | |
} |
I would like to point out that this code
line124 result.iterator
should be replaced by this result.iterator
In some really rare cases, there can be empty ByteString in the result.
This empty ByteString causes PlayFramework response to die because Http response chunks cant be of zero length.
After 2 days of debugging, I did not manage to detect the exact root of the issue and even its circumstances.
But the bug is likely to observe on large zip entries (~4Mb).
I suppose it is somehow related to the JVM bug https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8133170.
Hi
Thanks for the example, I'd really like to reuse this code, but it turns out that /* License: MIT */
is "not good enough". There must either be an reference to a license file or the full license with your name as copyright holder (note: IANAL). I know this is an inconvenience: But would it be possible to add a full License clause with your name as copyright holder?
The license can be found here: https://simple.wikipedia.org/wiki/MIT_License
Wow, how am I not getting notices for all this conversation!
I've incorporated the changes noted here; unfortunately this was an exploratory effort and I never had a chance to put it into something that would test it well. We were, however, using the Play framework version in production. The code for this one was derived from the other, which uses Play's iteratees.
@cozyss also see the above link, which is the Play framework's iteratee-based version. It is being used in several production apps.
@magnusart done
There is one more problem - when upstream finishes without producing any elements it hangs forever, my fix:
override def onUpstreamFinish(): Unit = {
if (!buffer.isEmpty) {
buffer.close()
if (isAvailable(out)) {
push(out, buffer.toByteString)
}
}else {
super.onUpstreamFinish()
}
}
@s4s0l thanks, incorporated!
Adding super.onUpstreamFinish()
in onUpstreamFinish
created problem.
When a file is big, super.onUpstreamFinish()
can be called event if downstream didn't consume all fileChunks
.
This will result in not closing zip.
One of the workarounds is creating boolean emptyStream
and running super.onUpstreamFinish()
only when the stream was empty.
if(emptyStream) { super.onUpstreamFinish() }
@kirked It's a very useful stage, did you consider adding it to akka-stream-contrib ?
@unkarjedy referring your comment:
I would like to point out that this code
line124 result.iterator
should be replaced by thisresult.iterator
Replace result.iterator
with what? Can you maybe post your whole file? Thanks!
@michalbogacz Can you please post the changes you mentioned in your comment?
@michalbogacz I did here: mkurz/streamed-zip@9d3fa3c
Can you have a look please? Is this correct? I was unsure where to set emptyStream = true
, should it be in the if (buffer.isEmpty) {
block or at the end of the outer if(isClosed(in)){...}
block?
Thank you very much!
@mkurz it looks ok, although I didn't have these changes from lines 71-76. If you want to be sure of your implementation, add tests :)
If anybody would like to submit this to akka/akka-stream-contrib, you may be able to get it done before I have a chance to, and I'm fine with doing that!
@michalbogacz I just was unsure if this is correct:
if (isClosed(in)) {
if (buffer.isEmpty) {
emptyStream = true
completeStage()
} else {
buffer.close
push(out, buffer.toByteString)
}
}
or this:
if (isClosed(in)) {
emptyStream = true
if (buffer.isEmpty) {
completeStage()
} else {
buffer.close
push(out, buffer.toByteString)
}
// or here: emptyStream = true?
}
Do you think it makes a difference?
I don't think you'd want to set emptyStream
true` unless the buffer is empty, no?
I've updated the gist with that change, and if there are further changes I'll be monitoring. Github finally has implemented notification emails for gists, so at least now I can see that there's activity.
I'll also be contacting @raboof about getting this, or a variant of it, added to akka-stream-config. Since they've already got a ZipInputStreamSource
, I've renamed this ZipToStream
(perhaps it should even be ZipToStreamFlow
).
@mkurz I didn't use it in onPull
. This flag was only for case where you had empty stream in the beginning.
Later I created my own solution URL that was using Source[ByteString]
instead of InputStream
(I wanted to download zip with many files from S3 using Alpakka, so using InputStream only complicated my code). I can too contribute my code to akka-stream-contrib if there will be interest from @raboof or other project members.
There. PR at akka/akka-stream-contrib#170 . I agree that using Alpakka today would probably be best; it wasn't around when this originated, only the AWS Java SDK was available!
Perhaps something good will come of it, and it can be supported.
@kirked You define private var emptyStream = false
? Is this correct? That means the stream is not empty from the beginning? (In my code I initialized it with true
). I just want to make sure you or I did not make a mistake 😉
And about the buffer:
} else {
buffer.close
push(out, buffer.toByteString)
}
So in this else we are closing the buffer and push its remaining content out - so isn't, after these two lines, the buffer empty as well? And therefore could set emptyStream
to true
as well? (However if afterwards the onPull
will be called once more, than your current code is correct because then we would end up in the if (buffer.isEmpty) {
branch in that subsequent invocation anyway.
(I am not much into this code so probably you a right anyway, I just want to have kind of confirmation 😉)
Yes, just fixed, thanks!
@kirked Just one last thing: In the above comment the author writes
In some really rare cases, there can be empty ByteString in the result.
This empty ByteString causes PlayFramework response to die because Http response chunks cant be of zero length.
and suggests to replace result.iterator
... I guess he meant to replace it with something like result.iterator.filter(!_.isEmpty)
to ignore empty elements? Because they are empty they shouldn't matter anyway?
Does this make sense? What do you think?
@michalbogacz Yes, please contribute your code to akka-stream-contrib as well. I am interested 😉 Why not open a pull request?
I've added the filter(_.nonEmpty)
to the stream iterator.
@kirked Thanks! Can you add it to your pull request as well?
@michalbogacz I saw you opened akka/akka-stream-contrib#171, thanks a lot!
@kirked Thanks! I am aware, I am following that pull request too 😉 Thanks for your great work!
+1 to @hejfelix
line 75: completeStage()
Should be removed.
If
fileChunks
returns many large chunks the finalbuffer.endEntry
in
line70: emitMultiple(out, fileChunks(stream, buffer), () => buffer.endEntry())
can be called after
override def onUpstreamFinish()
so the last zip entry will be empty.Also
line74 closeInput()
should be moved to line70 callbackandThen
.