Skip to content

Instantly share code, notes, and snippets.

@fiadliel
Created August 30, 2015 23:23
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fiadliel/92c1c37880f916a89df9 to your computer and use it in GitHub Desktop.
Save fiadliel/92c1c37880f916a89df9 to your computer and use it in GitHub Desktop.
Reading Zip files with scalaz-stream
import java.io.{File, FileNotFoundException, IOException, InputStream}
import java.util.zip.{ZipEntry, ZipException, ZipFile}
import org.http4s.DateTime
import scodec.bits.ByteVector
import scalaz._
import scalaz.Scalaz._
import scalaz.concurrent.Task
import scalaz.stream._
/**
* Provides utilities for reading Zip files, and safely providing streams
* of data from those Zip files to a user.
*/
object ZipStream {
type ZipReader[A] = EitherT[Task, ReadError, A]
object ZipReader {
def apply[A](t: Task[ReadError \/ A]): ZipReader[A] = EitherT(t)
def apply[A](t: ReadError \/ A): ZipReader[A] = EitherT(Task.delay(t))
def apply[A](t: A): ZipReader[A] = EitherT(Task.delay(t.right))
def handle[A](t: Task[A])(pf: PartialFunction[Throwable, ReadError]): ZipReader[A] = {
ZipReader(t.map(_.right).handle(pf.andThen(_.left)))
}
def getOrError[A](t: Task[Option[A]], none: ReadError): ZipReader[A] = {
ZipReader(t.map(_.map(_.right).getOrElse(none.left)))
}
}
def getZipFile(file: File): ZipReader[ZipFile] = {
Task.delay(new ZipFile(file)) |> { t =>
ZipReader.handle(t) {
case e: FileNotFoundException => FileNotFound
}
}
}
def getZipFileOrNone(file: File, ifModifiedSince: Option[DateTime]): ZipReader[ZipFile] = {
ifModifiedSince match {
case None => getZipFile(file)
case Some(lm) =>
ZipReader(DateTime(file.lastModified)) flatMap { lm =>
if (lm < DateTime(file.lastModified))
getZipFile(file)
else
ZipReader(ContentUnchanged.left)
}
}
}
def getZipEntry(zipFile: ZipFile, name: String): ZipReader[ZipEntry] = {
Task.delay(Option(zipFile.getEntry(name))) |> { t =>
ZipReader.getOrError(t, EntryNotFound)
}
}
def getInputStream(zipFile: ZipFile, entry: ZipEntry): ZipReader[InputStream] = {
Task.delay(zipFile.getInputStream(entry)) |> { t =>
ZipReader.handle(t) {
case e: ZipException => FormatError
case io: IOException => IOError
}
}
}
def getChunkStream(is: InputStream, chunkSize: Int): ZipReader[Process[Task, ByteVector]] = {
ZipReader(Process.constant(chunkSize).through(io.chunkR(is)))
}
/**
*
* @param file The [[java.io.File]] representing the Zip file to read from.
* @param name The name of the entry in the Zip file to retrieve.
* @param ifModifiedSince Only return data if modified since this time.
* @param chunkSize Data is returned in batches of this size.
* @return A [[scalaz.concurrent.Task]] which will give either an error, or
* a [[scalaz.stream.Process]] providing a stream of [[scodec.bits.ByteVector]]
* which contains the content of the Zip entry.
*/
def fromZip(file: File, name: String, ifModifiedSince: Option[DateTime], chunkSize: Int = 10240): Task[ReadError \/ Process[Task, ByteVector]] = {
(for {
zipFile <- getZipFileOrNone(file, ifModifiedSince)
entry <- getZipEntry(zipFile, name)
is <- getInputStream(zipFile, entry)
stream <- getChunkStream(is, chunkSize)
} yield stream).run
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment