Skip to content

Instantly share code, notes, and snippets.

@ChristopherDavenport
Created December 27, 2017 21:23
Show Gist options
  • Save ChristopherDavenport/edfcb63ff24a94378c572e9d9ac12656 to your computer and use it in GitHub Desktop.
Save ChristopherDavenport/edfcb63ff24a94378c572e9d9ac12656 to your computer and use it in GitHub Desktop.
BodyStreamingExample
package example.playground
import java.io.InputStream
import cats.effect.IO
import fs2._
import cats.implicits._
import java.util.concurrent.ScheduledExecutorService
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.ExecutionContext.Implicits.global
object StreamInteruptTest {
val stream : Stream[IO, Byte] = ???
val inputStreamStream : Stream[IO, InputStream] = stream.through(fs2.io.toInputStream[IO])
val inputStreamOpt : IO[Option[InputStream]] = inputStreamStream.runLast
def withInputStreamString(i: InputStream): IO[String] = ??? // This is the effectful java function
val stringOpt: IO[Option[String]] = inputStreamOpt.flatMap(_.map(withInputStreamString).sequence) // also .traverse
val streamOpt : Stream[IO, String] = inputStreamStream.evalMap(withInputStreamString) // Recommend Streaming All the Way
//Then at then at the end
import org.http4s._
import org.http4s.multipart._
import org.http4s.dsl.io._
val response: Stream[IO, String] => IO[Response[IO]] = s => s.runLast.flatMap{
case Some(s) => Ok(s)
case None => NotFound()
}
val asMultipart : Request[IO] => IO[Multipart[IO]] = req => req.as[Multipart[IO]]
val getImportantMultipartSection : Multipart[IO] => Option[Stream[IO, Byte]] = m =>
m.parts.find(part => part.name == "file".some).map(_.body)
val f : Request[IO] => IO[Vector[Byte]] = req => Stream.eval(asMultipart(req))
.map(m => getImportantMultipartSection(m))
.flatMap{
case Some(s) => s
case None => Stream.raiseError[Byte](new Throwable("Didn't have right part!")).covary[IO] // Stream[Pure, A] => Stream[F, A]
}.runLog
val toArray : Vector[Byte] => Array[Byte] = v => v.toArray
val arrayUseFunction : Array[Byte] => IO[String] = ???
def multipartHandlerRecovery(t: Throwable): IO[Response[IO]] = t match {
case badPart if badPart.getMessage.contains("Didn't have right part!") => BadRequest()
case _ => InternalServerError()
}
val responder : Request[IO] => IO[Response[IO]] =
f(_)
.map(toArray)
.flatMap(arrayUseFunction)
.flatMap(Ok(_))
.attempt
.flatMap(_.fold(multipartHandlerRecovery, _.pure[IO]))
def main(args: Array[String]): Unit = {
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment