Skip to content

Instantly share code, notes, and snippets.

@rossabaker
Forked from lbialy/SixteenElements.scala
Created October 21, 2018 01:56
Show Gist options
  • Save rossabaker/68310db1306a06495b646384bc5fa77e to your computer and use it in GitHub Desktop.
Save rossabaker/68310db1306a06495b646384bc5fa77e to your computer and use it in GitHub Desktop.
Problem with AHC-based client in http4s
val Http4sVersion = "0.18.19"
val CirceVersion = "0.10.0"
val JawnFs2Version = "0.12.2"
"org.http4s" %% "http4s-async-http-client" % Http4sVersion,
"org.http4s" %% "jawn-fs2" % JawnFs2Version,
"org.http4s" %% "http4s-blaze-server" % Http4sVersion,
"org.http4s" %% "http4s-circe" % Http4sVersion,
"org.http4s" %% "http4s-dsl" % Http4sVersion,
"io.circe" %% "circe-core" % CirceVersion,
"io.circe" %% "circe-generic" % CirceVersion,
"io.circe" %% "circe-java8" % CirceVersion,
import cats.effect.{Effect, IO}
import cats.syntax.flatMap.toFlatMapOps
import cats.syntax.functor.toFunctorOps
import fs2.StreamApp.ExitCode
import fs2.{Pipe, Scheduler, Stream, StreamApp}
import io.circe.Json
import io.circe.jawn.CirceSupportParser
import jawn.Facade
import jawnfs2._
import org.http4s.client.Client
import org.http4s.client.asynchttpclient.AsyncHttpClient
import org.http4s.dsl.Http4sDsl
import org.http4s.server.blaze.BlazeBuilder
import org.http4s.{HttpService, Method, Request, Uri}
case class Test(a: Long, b: String)
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
object SixteenElements extends StreamApp[IO] {
def stream(args: List[String], requestShutdown: IO[Unit]): Stream[IO, ExitCode] = {
BlazeBuilder[IO].bindHttp(8080, "0.0.0.0").mountService(new Service[IO].service, "/").serve
.concurrently(Scheduler[IO](4).flatMap(_.delay(Stream eval runInfiniteStream[IO], 10.seconds)))
}
def runInfiniteStream[F[_]: Effect]: F[Unit] = {
val start = Test(1, 1.toString)
val it = Iterator
.iterate((1, start)) {
case (idx, prev) =>
Thread.sleep(100)
(idx + 1, prev.copy(a = prev.a + 1, b = (prev.a + 1).toString))
}
.map(t => {
println(s"Produced ${t._1} element")
t
})
.map(_._2)
streamStatementsToServer(Stream.fromIterator(it))
}
def streamStatementsToServer[F[_]: Effect](stream: Stream[F, Test]): F[Unit] = {
import io.circe.generic.auto._
import io.circe.syntax._
def encode: Pipe[F, Test, Byte] = _.map(_.asJson.noSpaces.getBytes).flatMap(Stream.emits(_))
def streamToServer(client: Client[F]): Stream[F, String] = {
val serverUri = Uri.uri("http://localhost:8080/statements/update")
val req = Request(Method.POST, serverUri).withBody(stream through encode)
Stream eval client.expect[String](req)
}
(for {
resultAsString <- streamToServer(AsyncHttpClient())
_ <- Stream.eval(Effect[F].delay(println(resultAsString)))
} yield ()).compile.drain
}
}
class Service[F[_]: Effect] extends Http4sDsl[F] {
import io.circe.generic.auto._
import org.http4s.circe._
implicit private val facade: Facade[Json] = CirceSupportParser.facade
val service: HttpService[F] = {
HttpService[F] {
case req @ POST -> Root / "statements" / "update" => processDiscard(req).flatMap(Ok(_))
}
}
private def processDiscard(req: Request[F]): F[Json] = {
req.body.chunks.parseJsonStream
.map(_.as[Test])
.flatMap {
case Left(err) => Stream.raiseError(err)
case Right(statement) => Stream.emit(statement)
}
.evalMap(s => Effect[F].delay(println(s)))
.take(Long.MaxValue)
.zipWithIndex
.map(_._2 + 1)
.compile
.last
.map(_.getOrElse(0L))
.map { ingestedCount =>
Json.obj {
"ingestedCount" -> (Json fromLong ingestedCount)
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment