Last active
February 9, 2019 01:05
-
-
Save Leonti/31976a00157db308642ed77678804d78 to your computer and use it in GitHub Desktop.
Attempt to write Stream[F, (String, Stream[F, Byte])] to zipped Stream[F, Byte]
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package backup | |
import java.io.OutputStream | |
import cats.effect._ | |
import cats.effect.implicits._ | |
import cats.implicits._ | |
import fs2.{Chunk, Pipe, Stream, io} | |
import java.util.zip.{ZipEntry, ZipOutputStream} | |
import fs2.concurrent.Queue | |
import scala.concurrent.{ExecutionContext, SyncVar} | |
// https://github.com/slamdata/fs2-gzip/blob/master/core/src/main/scala/fs2/gzip/package.scala | |
// https://github.com/scalavision/fs2-helper/blob/master/src/main/scala/fs2helper/zip.scala | |
// https://github.com/eikek/sharry/blob/2f1dbfeae3c73bf2623f65c3591d0b3e0691d4e5/modules/common/src/main/scala/sharry/common/zip.scala | |
object Fs2Zip { | |
private def writeEntry[F[_]](zos: ZipOutputStream)(implicit F: Concurrent[F], | |
blockingEc: ExecutionContext, | |
contextShift: ContextShift[F]): Pipe[F, (String, Stream[F, Byte]), Unit] = | |
_.flatMap { | |
case (name, data) => | |
val createEntry = Stream.eval(F.delay { | |
zos.putNextEntry(new ZipEntry(name)) | |
}) | |
val writeEntry = data.through(io.writeOutputStream(F.delay(zos.asInstanceOf[OutputStream]), blockingEc, closeAfterUse = false)) | |
val closeEntry = Stream.eval(F.delay(zos.closeEntry())) | |
createEntry ++ writeEntry ++ closeEntry | |
} | |
private def zipP1[F[_]](implicit F: ConcurrentEffect[F], | |
blockingEc: ExecutionContext, | |
contextShift: ContextShift[F]): Pipe[F, (String, Stream[F, Byte]), Byte] = entries => { | |
Stream.eval(Queue.unbounded[F, Option[Chunk[Byte]]]).flatMap { q => | |
Stream.suspend { | |
val os = new java.io.OutputStream { | |
private def enqueueChunkSync(a: Option[Chunk[Byte]]) = { | |
println(s"enqueueChunkSync $a") | |
val done = new SyncVar[Either[Throwable, Unit]] | |
q.enqueue1(a).start.flatMap(_.join).runAsync(e => IO(done.put(e))).unsafeRunSync | |
done.get.fold(throw _, identity) | |
println(s"enqueueChunkSync done $a") | |
} | |
@scala.annotation.tailrec | |
private def addChunk(c: Chunk[Byte]): Unit = { | |
val free = 1024 - bufferedChunk.size | |
if (c.size > free) { | |
enqueueChunkSync(Some(Chunk.vector(bufferedChunk.toVector ++ c.take(free).toVector))) | |
bufferedChunk = Chunk.empty | |
addChunk(c.drop(free)) | |
} else { | |
bufferedChunk = Chunk.vector(bufferedChunk.toVector ++ c.toVector) | |
} | |
} | |
private var bufferedChunk: Chunk[Byte] = Chunk.empty | |
override def close(): Unit = { | |
// flush remaining chunk | |
enqueueChunkSync(Some(bufferedChunk)) | |
bufferedChunk = Chunk.empty | |
// terminate the queue | |
enqueueChunkSync(None) | |
} | |
override def write(bytes: Array[Byte]): Unit = | |
Chunk.bytes(bytes) | |
override def write(bytes: Array[Byte], off: Int, len: Int): Unit = | |
addChunk(Chunk.bytes(bytes, off, len)) | |
override def write(b: Int): Unit = | |
addChunk(Chunk.singleton(b.toByte)) | |
} | |
val write: Stream[F, Unit] = Stream | |
.bracket(F.delay(new ZipOutputStream(os)))((zos: ZipOutputStream) => F.delay(zos.close())) | |
.flatMap((zos: ZipOutputStream) => entries.through(writeEntry(zos))) | |
val read = q.dequeue | |
.unNoneTerminate | |
.flatMap(Stream.chunk(_)) | |
read.concurrently(write) | |
} | |
} | |
} | |
def zip[F[_]: ConcurrentEffect: ContextShift](entries: Stream[F, (String, Stream[F, Byte])])( | |
implicit ec: ExecutionContext): Stream[F, Byte] = | |
entries.through(zipP1) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment