-
-
Save marcodippy/eddf5905662936b92cb73b3646c1880f to your computer and use it in GitHub Desktop.
jetty-reactive-client backend for http4s
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.http4s | |
package client | |
package jetty | |
import cats.data.Kleisli | |
import cats.effect._ | |
import cats.effect.implicits._ | |
import cats.implicits.{catsSyntaxEither => _, _} | |
import fs2._ | |
import fs2.Stream._ | |
import fs2.interop.reactivestreams._ | |
import java.util.function.BiFunction | |
import org.eclipse.jetty.client.HttpClient | |
import org.eclipse.jetty.http.HttpFields | |
import org.eclipse.jetty.reactive.client.{ContentChunk, ReactiveRequest, ReactiveResponse} | |
import org.reactivestreams.{Publisher, Subscriber} | |
import scala.concurrent.ExecutionContext | |
import scala.collection.JavaConverters._ | |
object JettyReactiveClient { | |
def apply[F[_]](client: HttpClient = new HttpClient())( | |
implicit F: ConcurrentEffect[F], | |
ec: ExecutionContext): F[Client[F]] = | |
F.pure(client) | |
.flatTap(client => F.delay { client.start() }) | |
.map(client => | |
Client( | |
Kleisli { req => | |
F.async[DisposableResponse[F]] { cb => | |
implicit val timer: Timer[F] = Timer.derive[F](F, IO.timer(ec)) | |
StreamSubscriber[F, Unit] | |
.map(subscriber => { | |
subscriber.stream.compile.drain.runAsync(_ => IO.unit).unsafeRunSync() | |
toRequest(client, req).response(responseHandler(cb)).subscribe(subscriber) | |
}) | |
.runAsync(_ => IO.unit) | |
.unsafeRunSync | |
} | |
}, | |
F.delay(client.stop()) | |
)) | |
private def toRequest[F[_]](client: HttpClient, request: Request[F])( | |
implicit F: ConcurrentEffect[F], | |
ec: ExecutionContext): ReactiveRequest = { | |
implicit val timer: Timer[F] = Timer.derive[F](F, IO.timer(ec)) | |
val jReq = client.newRequest(request.uri.toString).method(request.method.name) | |
for (h <- request.headers) | |
jReq.header(h.name.toString, h.value) | |
ReactiveRequest | |
.newBuilder(jReq) | |
.content( | |
ReactiveRequest.Content | |
.fromPublisher( | |
StreamUnicastPublisher( | |
request.body.chunks.map(chunk => new ContentChunk(chunk.toByteBuffer))), | |
request.contentType.map(_.value).getOrElse("") | |
) | |
) | |
.build() | |
} | |
private def responseHandler[F[_]]( | |
cb: Callback[DisposableResponse[F]])(implicit F: ConcurrentEffect[F], ec: ExecutionContext) = | |
new BiFunction[ReactiveResponse, Publisher[ContentChunk], Publisher[Unit]] { | |
override def apply( | |
rRes: ReactiveResponse, | |
content: Publisher[ContentChunk]): Publisher[Unit] = | |
(s: Subscriber[_ >: Unit]) => { | |
implicit val timer: Timer[F] = Timer.derive[F](F, IO.timer(ec)) | |
val body = content | |
.toStream() | |
.flatMap(part => { | |
val bytes = chunk(Chunk.byteBuffer(part.buffer)) | |
part.callback.succeeded() | |
bytes | |
}) | |
val dr = DisposableResponse[F]( | |
Response( | |
status = Status.fromInt(rRes.getResponse.getStatus).valueOr(throw _), | |
headers = getHeaders(rRes.getResponse.getHeaders), | |
body = body | |
), | |
F.unit) | |
ec.execute(() => cb(Right(dr))) | |
} | |
} | |
private def getHeaders(headers: HttpFields): Headers = | |
Headers(headers.asScala.map { header => Header(header.getName, header.getValue) | |
}.toList) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment