Skip to content

Instantly share code, notes, and snippets.

@rossabaker
Created October 21, 2018 01:57
Show Gist options
  • Save rossabaker/226d16354eb08ee7b6c05491939d162f to your computer and use it in GitHub Desktop.
Save rossabaker/226d16354eb08ee7b6c05491939d162f to your computer and use it in GitHub Desktop.
import cats.effect.{ConcurrentEffect, Effect, ExitCode, IO, IOApp}
import cats.syntax.flatMap.toFlatMapOps
import cats.syntax.functor.toFunctorOps
import fs2.{Pipe, Stream}
import io.circe.Json
import io.circe.jawn.CirceSupportParser
import jawn.RawFacade
import jawnfs2._
import org.http4s.client.Client
import org.http4s.client.asynchttpclient.AsyncHttpClient
import org.http4s.dsl.Http4sDsl
import org.http4s.server.blaze.BlazeBuilder
import org.http4s.{HttpRoutes, Method, Request, Uri}
case class Test(a: Long, b: String)
import scala.concurrent.duration._
object SixteenElements extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
BlazeBuilder[IO].bindHttp(8080, "0.0.0.0").mountService(new Service[IO].service, "/").serve
.concurrently(Stream.sleep(10.seconds) ++ Stream.eval(runInfiniteStream[IO]))
.compile
.lastOrError
}
def runInfiniteStream[F[_]: ConcurrentEffect]: F[Unit] = {
val start = Test(1, 1.toString)
val it = Iterator
.iterate((1, start)) {
case (idx, prev) =>
(idx + 1, prev.copy(a = prev.a + 1, b = (prev.a + 1).toString * 10))
}
.map(t => {
println(s"Produced ${t._1} element")
t
})
.map(_._2)
streamStatementsToServer(Stream.fromIterator(it))
}
def streamStatementsToServer[F[_]: ConcurrentEffect](stream: Stream[F, Test]): F[Unit] = {
import io.circe.generic.auto._
import io.circe.syntax._
def encode: Pipe[F, Test, Byte] = _.map(_.asJson.noSpaces.getBytes).flatMap(Stream.emits(_))
def streamToServer(client: Client[F]): Stream[F, String] = {
val serverUri = Uri.uri("http://localhost:8080/statements/update")
val req = Request(Method.POST, serverUri).withEntity(stream through encode)
Stream eval client.expect[String](req)
}
AsyncHttpClient.resource[F]().use { client =>
(for {
resultAsString <- streamToServer(client)
_ <- Stream.eval(Effect[F].delay(println(resultAsString)))
} yield ()).compile.drain
}
}
}
class Service[F[_]: Effect] extends Http4sDsl[F] {
import io.circe.generic.auto._
import org.http4s.circe._
implicit private val facade: RawFacade[Json] = CirceSupportParser.facade
val service: HttpRoutes[F] = {
HttpRoutes.of[F] {
case req @ POST -> Root / "statements" / "update" => processDiscard(req).flatMap(Ok(_))
}
}
private def processDiscard(req: Request[F]): F[Json] = {
req.body.chunks.parseJsonStream
.map(_.as[Test])
.flatMap {
case Left(err) => Stream.raiseError(err)
case Right(statement) => Stream.emit(statement)
}
.evalMap(s => Effect[F].delay(println(s)))
.take(Long.MaxValue)
.zipWithIndex
.map(_._2 + 1)
.compile
.last
.map(_.getOrElse(0L))
.map { ingestedCount =>
Json.obj {
"ingestedCount" -> (Json fromLong ingestedCount)
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment