Skip to content

Instantly share code, notes, and snippets.

@Daenyth
Created May 18, 2020 18:29
Show Gist options
  • Save Daenyth/4c5aa3bafe1d85909437eb93b0cea3e1 to your computer and use it in GitHub Desktop.
Save Daenyth/4c5aa3bafe1d85909437eb93b0cea3e1 to your computer and use it in GitHub Desktop.
POC akka-http to http4s layer
package akkahttp4s
import akka.http.scaladsl.model.{
ContentType,
ContentTypes,
HttpEntity,
HttpHeader,
HttpMethod,
HttpMethods,
HttpProtocol,
HttpProtocols,
HttpResponse,
ParsingException,
ResponseEntity,
Uri => AkkaUri
}
import akka.http.scaladsl.server.{
Rejection,
RequestContext,
RouteResult,
Route => AkkaRoute
}
import akka.stream.Materializer
import akka.stream.scaladsl.Source
import cats.data.{NonEmptyChain, OptionT, ValidatedNec}
import cats.effect.implicits._
import cats.effect.{ConcurrentEffect, ContextShift}
import cats.implicits._
import fs2.Stream
import org.http4s.headers.`Content-Type`
import org.http4s.syntax.all._
import org.http4s.{
Header,
Headers,
HttpRoutes,
HttpVersion,
Method,
Request,
Response,
Uri
}
import streamz.converter._
import Fs2AkkaCompat
import scala.collection.immutable
import scala.util.control.NoStackTrace
object AkkaHttp4s {
def apply[F[_]: ConcurrentEffect: ContextShift](
http4sRoutes: HttpRoutes[F]
)(implicit mat: Materializer): AkkaRoute =
new AkkaHttp4s(http4sRoutes).akkaRoute
}
class AkkaHttp4s[F[_]: ConcurrentEffect: ContextShift](
http4sRoutes: HttpRoutes[F]
)(
implicit mat: Materializer
) {
private type OrErr[A] = ValidatedNec[Throwable, A]
def akkaRoute: AkkaRoute = { req =>
handleRequest(req) // TODO convert http4s exceptions to Rejections?
.getOrElse(RouteResult.Rejected(immutable.Seq.empty[Rejection])) // TODO what does akka-http give for 404?
.toIO
.unsafeToFuture()
}
private def handleRequest(req: RequestContext): OptionT[F, RouteResult] =
OptionT
.liftF(convertRequest(req))
.flatMap(http4sRoutes.run)
.semiflatMap(convertResponse)
private def convertRequest(req: RequestContext): F[Request[F]] =
// TODO does request.uri work if the akka route nests path sections? Should use `req.unmatchedPath`?
(
methodFrom(req.request.method),
convertUri(req.request.uri)
).mapN { (method, uri) =>
val entity: Stream[F, Byte] =
req.request.entity
.getDataBytes()
.toStream()
.through(Fs2AkkaCompat.fromByteString)
Request(method,
uri,
httpVersionFrom(req.request.httpMessage.protocol),
headersFrom(req.request.headers),
entity)
}
.leftMap(AkkaToHttp4sFailed(_, req): Throwable)
.liftTo[F]
private def methodFrom(method: HttpMethod): OrErr[Method] = method match {
case HttpMethods.CONNECT => Method.CONNECT.validNec
case HttpMethods.DELETE => Method.DELETE.validNec
case HttpMethods.GET => Method.GET.validNec
case HttpMethods.HEAD => Method.HEAD.validNec
case HttpMethods.OPTIONS => Method.OPTIONS.validNec
case HttpMethods.PATCH => Method.PATCH.validNec
case HttpMethods.POST => Method.POST.validNec
case HttpMethods.PUT => Method.PUT.validNec
case HttpMethods.TRACE => Method.TRACE.validNec
case other => Method.fromString(other.value.toUpperCase).toValidatedNec
}
private def convertUri(akkaUri: AkkaUri): OrErr[Uri] =
Uri.fromString(akkaUri.toString()).toValidatedNec
private def httpVersionFrom(protocol: HttpProtocol): HttpVersion =
protocol match {
case HttpProtocols.`HTTP/1.0` => HttpVersion.`HTTP/1.0`
case HttpProtocols.`HTTP/1.1` => HttpVersion.`HTTP/1.1`
case HttpProtocols.`HTTP/2.0` => HttpVersion.`HTTP/2.0`
case other =>
// The type permits this, but's it's `final` with a private constructor,
// so in practice this code path is impossible
sys.error(s"impossible http version: $other")
}
private def httpVersionTo(ver: HttpVersion): HttpProtocol = ver match {
case HttpVersion.`HTTP/1.0` => HttpProtocols.`HTTP/1.0`
case HttpVersion.`HTTP/1.1` => HttpProtocols.`HTTP/1.1`
case HttpVersion.`HTTP/2.0` => HttpProtocols.`HTTP/2.0`
case other =>
// The type permits this, but's it's `final` with a private constructor,
// so in practice this code path is impossible
sys.error(s"impossible http version: $other")
}
private def headersFrom(
headers: immutable.Seq[HttpHeader]
): Headers = {
def hdr(h: HttpHeader): Header = Header.Raw.apply(h.name.ci, h.value)
Headers(headers.map(hdr).toList)
}
private def headersTo(headers: Headers): OrErr[immutable.Seq[HttpHeader]] = {
def hdr(h: Header): OrErr[HttpHeader] =
HttpHeader.parse(h.name.value, h.value) match {
case HttpHeader.ParsingResult.Ok(result, _) => result.validNec
case HttpHeader.ParsingResult.Error(info) =>
new Exception(info.format(withDetail = true)).invalidNec
}
headers.toList.traverse(hdr).map(_.toSeq)
}
private def convertResponse(resp: Response[F]): F[RouteResult] =
(
httpEntity(resp),
headersTo(resp.headers)
).mapN { (entity, headers) =>
val akkaResp = HttpResponse(
status = resp.status.code,
headers = headers,
entity = entity,
protocol = httpVersionTo(resp.httpVersion)
)
RouteResult.Complete(akkaResp): RouteResult
}
.leftMap(Http4sToAkkaFailed(_, resp))
.liftTo[F]
private def httpEntity(resp: Response[F]): OrErr[ResponseEntity] = {
val getContentType =
resp.contentType
.map(contentTypeTo)
.getOrElse(ContentTypes.`application/octet-stream`.validNec)
getContentType.map { contentType =>
val body = Source.fromGraph(
resp.body.through(Fs2AkkaCompat.byteToByteString).toSource)
HttpEntity.CloseDelimited(contentType, body)
}
}
private def contentTypeTo(ct: `Content-Type`): OrErr[ContentType] =
ContentType
.parse(ct.value)
.leftMap { errs =>
val msg = errs.map(_.format(withDetail = true))
ParsingException(
s"Failed to convert http4s Content-Type to akka-http: $msg"): Throwable
}
.toValidatedNec
}
case class AkkaToHttp4sFailed(
causes: NonEmptyChain[Throwable],
requestContext: RequestContext
) extends Exception(
s"Failed to convert Akka-Http request to Http4s: ${causes.map(_.toString).mkString_("[", ", ", "]")}",
causes.head // arbitrary, but better than not attaching a cause
) with NoStackTrace
case class Http4sToAkkaFailed[F[_]](
causes: NonEmptyChain[Throwable],
response: Response[F]
) extends Exception(
s"Failed to convert Http4s response to Akka-Http: ${causes.map(_.toString).mkString_("[", ", ", "]")}",
causes.head
) with NoStackTrace
package akkahttp4s
import akka.http.scaladsl.model.{
ContentTypes,
HttpRequest,
HttpResponse,
ResponseEntity,
StatusCodes
}
import akka.http.scaladsl.server.{Route => AkkaRoute}
import akka.http.scaladsl.testkit.ScalatestRouteTest
import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller}
import cats.effect.{Async, IO}
import cats.effect.concurrent.Semaphore
import cats.implicits._
import fs2.Stream
import fs2.concurrent.SignallingRef
import io.circe.literal._
import org.http4s.HttpRoutes
import org.http4s.dsl.Http4sDsl
import streamz.converter._
import IOSpec // https://gist.github.com/Daenyth/67575575b5c1acc1d6ea100aae05b3a9
import scala.concurrent.duration._
class AkkaHttp4sSpec extends IOSpec with Http4sDsl[IO] with ScalatestRouteTest {
describe("simple requests") {
it("transfers simple responses") {
val msg = "hello world"
val route = AkkaHttp4s(HttpRoutes.of[IO] {
case GET -> Root => Ok(msg)
})
routeServer(route)(Get()).flatMap { resp =>
resp.status shouldBe StatusCodes.OK
unmarshall[String](resp.entity).shouldResultIn(msg)
}
}
}
describe("streaming") {
it("returns stream responses") {
import org.http4s.circe.CirceEntityEncoder._
val route = AkkaHttp4s(HttpRoutes.of[IO] {
case GET -> Root =>
Ok(Stream.emit(json"""{"key":"value"}""").covary[IO])
})
routeServer(route)(Get()).flatMap { resp =>
resp.status shouldBe StatusCodes.OK
resp.entity.contentType shouldBe ContentTypes.`application/json`
unmarshall[String](resp.entity).shouldResultIn("""{"key":"value"}""")
}
}
it("streams lazily") {
val test = for {
stopResponse <- SignallingRef[IO, Boolean](false)
lock <- Semaphore[IO](1)
route: AkkaRoute = AkkaHttp4s(HttpRoutes.of[IO] {
case GET -> Root =>
Ok(
Stream
.eval(lock.acquire.as("locked"))
.repeat // Ensure the stream won't terminate on its own - it must be interrupted
.interruptWhen(stopResponse))
})
response <- routeServer(route)(Get())
respEntity <- response.entity.dataBytes
.toStream()
.evalTap { _ =>
// Once we get some response bytes, we can terminate the stream producing the server-side
// response, allowing the request to complete.
stopResponse.set(true)
}
.map(_.utf8String)
.compile
.lastOrError
} yield {
// If the response handling tries to fully produce the stream before returning a response, then we never get here
respEntity shouldBe "locked"
}
test.timeoutTo(
2.seconds,
IO(fail("Response did not complete - stream was not lazily evaluated")))
}
}
describe("errors") {
it("404s for no match") {
val route = AkkaHttp4s(HttpRoutes.of[IO](PartialFunction.empty))
routeServer(route)(Get()).map { resp =>
resp.status shouldBe StatusCodes.NotFound
}
}
it("translates errors in route") {
val route = AkkaHttp4s(HttpRoutes.of[IO] {
case GET -> Root =>
IO.raiseError[String](new Exception("boom")).flatMap(Ok(_))
})
val server = routeServer(route)
server(Get()).map { response =>
println(response)
response.status shouldBe StatusCodes.InternalServerError
}
}
it("translates errors from EntityEncoder") {
val route = AkkaHttp4s(HttpRoutes.of[IO] {
case GET -> Root =>
// NB this differs from the "errors in route" case because the error here is in the effect
// producing the entity body rather than the effect producing the response itself
Ok(IO.raiseError[String](new Exception("boom")))
})
val server = routeServer(route)
server(Get()).flatMap { response =>
response.status shouldBe StatusCodes.InternalServerError
unmarshall[String](response.entity).shouldFail
}
}
}
private def routeServer(route: AkkaRoute): HttpRequest => IO[HttpResponse] = {
val server = AkkaRoute.asyncHandler(route)
req => Async.fromFuture(IO(server(req)))
}
private def unmarshall[A: Unmarshaller[ResponseEntity, *]](
entity: ResponseEntity
): IO[A] =
Async.fromFuture(IO(Unmarshal(entity).to[A]))
}
package akkahttp4s
import akka.util.ByteString
import fs2._
object Fs2AkkaCompat {
def byteToByteString[F[_]]: Pipe[F, Byte, ByteString] =
_.mapChunks(c => Chunk.singleton(ByteString.fromArray(c.toBytes.toArray)))
def fromByteString[F[_]]: Pipe[F, ByteString, Byte] =
_.flatMap(bs => Stream.chunk(Chunk.bytes(bs.toArray)))
}
@Daenyth
Copy link
Author

Daenyth commented May 18, 2020

Note translates errors from EntityEncoder fails

The effect producing the entity body isn't evaluated until after the http response header is sent, so this route gives a 200 response.

http://gitter.im/http4s/http4s?at=5e664988145f4d69562d65a7

note to self: use a middleware that prefetches N bytes and translates error to 500

nigredo-tori @nigredo-tori 10:09
If you control your Server implementation to that degree, you should be able to write your middleware asHttpApp[F] => Http[Resource[F, *], F] or something to that effect.

http://stackoverflow.com/questions/50174849/akka-streams-best-practice-to-initialise-and-dispose-resources-of-a-sink

take the K[Resource[F, *], Request[F], Response[F]] and .allocated, then use those with watchTermination

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment