Skip to content

Instantly share code, notes, and snippets.

@marcodippy
Created August 26, 2018 10:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save marcodippy/eddf5905662936b92cb73b3646c1880f to your computer and use it in GitHub Desktop.
Save marcodippy/eddf5905662936b92cb73b3646c1880f to your computer and use it in GitHub Desktop.
jetty-reactive-client backend for http4s
package org.http4s
package client
package jetty
import cats.data.Kleisli
import cats.effect._
import cats.effect.implicits._
import cats.implicits.{catsSyntaxEither => _, _}
import fs2._
import fs2.Stream._
import fs2.interop.reactivestreams._
import java.util.function.BiFunction
import org.eclipse.jetty.client.HttpClient
import org.eclipse.jetty.http.HttpFields
import org.eclipse.jetty.reactive.client.{ContentChunk, ReactiveRequest, ReactiveResponse}
import org.reactivestreams.{Publisher, Subscriber}
import scala.concurrent.ExecutionContext
import scala.collection.JavaConverters._
object JettyReactiveClient {
def apply[F[_]](client: HttpClient = new HttpClient())(
implicit F: ConcurrentEffect[F],
ec: ExecutionContext): F[Client[F]] =
F.pure(client)
.flatTap(client => F.delay { client.start() })
.map(client =>
Client(
Kleisli { req =>
F.async[DisposableResponse[F]] { cb =>
implicit val timer: Timer[F] = Timer.derive[F](F, IO.timer(ec))
StreamSubscriber[F, Unit]
.map(subscriber => {
subscriber.stream.compile.drain.runAsync(_ => IO.unit).unsafeRunSync()
toRequest(client, req).response(responseHandler(cb)).subscribe(subscriber)
})
.runAsync(_ => IO.unit)
.unsafeRunSync
}
},
F.delay(client.stop())
))
private def toRequest[F[_]](client: HttpClient, request: Request[F])(
implicit F: ConcurrentEffect[F],
ec: ExecutionContext): ReactiveRequest = {
implicit val timer: Timer[F] = Timer.derive[F](F, IO.timer(ec))
val jReq = client.newRequest(request.uri.toString).method(request.method.name)
for (h <- request.headers)
jReq.header(h.name.toString, h.value)
ReactiveRequest
.newBuilder(jReq)
.content(
ReactiveRequest.Content
.fromPublisher(
StreamUnicastPublisher(
request.body.chunks.map(chunk => new ContentChunk(chunk.toByteBuffer))),
request.contentType.map(_.value).getOrElse("")
)
)
.build()
}
private def responseHandler[F[_]](
cb: Callback[DisposableResponse[F]])(implicit F: ConcurrentEffect[F], ec: ExecutionContext) =
new BiFunction[ReactiveResponse, Publisher[ContentChunk], Publisher[Unit]] {
override def apply(
rRes: ReactiveResponse,
content: Publisher[ContentChunk]): Publisher[Unit] =
(s: Subscriber[_ >: Unit]) => {
implicit val timer: Timer[F] = Timer.derive[F](F, IO.timer(ec))
val body = content
.toStream()
.flatMap(part => {
val bytes = chunk(Chunk.byteBuffer(part.buffer))
part.callback.succeeded()
bytes
})
val dr = DisposableResponse[F](
Response(
status = Status.fromInt(rRes.getResponse.getStatus).valueOr(throw _),
headers = getHeaders(rRes.getResponse.getHeaders),
body = body
),
F.unit)
ec.execute(() => cb(Right(dr)))
}
}
private def getHeaders(headers: HttpFields): Headers =
Headers(headers.asScala.map { header => Header(header.getName, header.getValue)
}.toList)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment