Skip to content

Instantly share code, notes, and snippets.

@joshm1
Created July 27, 2018 21:56
Show Gist options
  • Save joshm1/b6024f9c2715ff5a95e09bb393ee0c4b to your computer and use it in GitHub Desktop.
Save joshm1/b6024f9c2715ff5a95e09bb393ee0c4b to your computer and use it in GitHub Desktop.
Playing with fs2, cats-effect, fs2-kafka
package com.spaceiq.spaceplanning
import java.nio.channels.AsynchronousChannelGroup
import java.nio.charset.Charset
import java.time.{Instant, ZoneOffset}
import java.time.format.DateTimeFormatter
import java.util.Formatter.DateTime
import java.util.concurrent.{Executors, TimeUnit}
import cats.effect.{Effect, IO, Sync}
import fs2.StreamApp.ExitCode
import fs2.{Scheduler, Stream, StreamApp}
import org.http4s.server.blaze.BlazeBuilder
import scodec.bits.ByteVector
import scala.concurrent.ExecutionContext.Implicits.global
import spinoco.fs2.kafka
import spinoco.protocol.kafka._
import scala.concurrent.duration._
import spinoco.fs2.kafka
import spinoco.fs2.kafka.{Logger, _}
import spinoco.protocol.kafka._
import scala.concurrent.duration._
class KafkaLogger[F[_]](implicit F: Effect[F]) extends Logger[F] {
override def log(level: Logger.Level.Value, msg: => String, throwable: Throwable): F[Unit] =
F.delay {
println(s"LOGGER: $level: $msg")
if (throwable != null) throwable.printStackTrace()
}
}
// The only place where the Effect is defined. You could change it for `monix.eval.Task` for example.
object Server extends HttpServer[IO]
class HttpServer[F[_]](implicit F: Effect[F]) extends StreamApp[F]() {
// implicits required by fs2-kafka
lazy val executor = Executors.newSingleThreadExecutor()
implicit val group: AsynchronousChannelGroup = AsynchronousChannelGroup.withThreadPool(executor)
implicit val kafkaLogger: Logger[F] = new KafkaLogger[F]()
def kafkaClient()(implicit S: fs2.Scheduler): Stream[F, KafkaClient[F]] = kafka.client(
ensemble = Set(broker("kafka1", port = 9092)),
protocol = ProtocolVersion.Kafka_0_10_2,
clientName = "my-client-name"
)
def withKafkaClient(cb: KafkaClient[F] => Stream[F, ExitCode])(implicit S: fs2.Scheduler): Stream[F, ExitCode] =
kafkaClient().flatMap(cb)
private val ctx = new Module[F]
override def stream(args: List[String], requestShutdown: F[Unit]): Stream[F, ExitCode] =
Scheduler(corePoolSize = 2).flatMap { implicit scheduler =>
withKafkaClient { implicit kafkaClient: KafkaClient[F] =>
blazeStream()
.concurrently(kafkaSubscribe())
.concurrently(kafkaPublish())
}
}
def blazeStream(): Stream[F, ExitCode] =
BlazeBuilder[F]
.bindHttp(8080, "0.0.0.0")
.mountService(ctx.userHttpEndpoint, "/users") // You can mount as many services as you want
.serve
def kafkaPublish()(implicit S: fs2.Scheduler, kc: KafkaClient[F]): Stream[F, ExitCode] =
Stream
.eval(Sync[F].delay(println("publish every 5 second")))
.flatMap[String] { _ =>
S.awakeEvery(5.second).map[String] { _ =>
DateTimeFormatter.ISO_DATE_TIME.withZone(ZoneOffset.UTC).format(Instant.now())
}
}
.flatMap[Unit] { msg =>
Stream
.eval {
val msgVector = ByteVector(msg.getBytes(Charset.defaultCharset()))
val key = ByteVector("testing".getBytes)
kc.publish1(topic("topic-A"), partition(0), key, msgVector, requireQuorum = true, 2.second)
}
.flatMap { l: Long =>
Stream.eval(Sync[F].delay(printf("published %s => %d\n", msg, l)))
}
}
.map(_ => ExitCode.Success)
def kafkaSubscribe()(implicit kc: KafkaClient[F]): Stream[F, ExitCode] =
Stream.eval(Sync[F].delay(println("subscribing..."))) >>
kc.subscribe(topic("topic-A"), partition(0), HeadOffset)
.evalMap[Unit] { topicMessage =>
Sync[F].delay {
printf(
"subscribed %s => %s\n",
new String(topicMessage.key.toArray),
new String(topicMessage.message.toArray)
)
}
}
.map(_ => ExitCode.Success)
}
@gvolpe
Copy link

gvolpe commented Jul 29, 2018

  • My point WRT withKafkaClient is that all it is doing is just kafkaClient().flatMap(cb) and you could do just that in the only place it's being used. So it would be (also including the def for val changes):
override def stream(args: List[String], requestShutdown: F[Unit]): Stream[F, ExitCode] =
  Scheduler(corePoolSize = 2).flatMap { implicit scheduler =>
    kafkaClient.flatMap { implicit client =>
      blazeStream
        .concurrently(kafkaSubscribe)
        .concurrently(kafkaPublish)
    }
  }
  • In order to use *> (right shark operator) you need import cats.syntax.apply._ in scope.

Another thing I didn't see before is on line 76 when using Instant.now(). This method is not RT, I'd wrap that in F. Eg.;

S.awakeEvery(5.second).evalMap { _ =>
  F.delay { DateTimeFormatter.ISO_DATE_TIME.withZone(ZoneOffset.UTC).format(Instant.now()) }
}

EDIT: I'm just seeing you're using def on these methods (eg. kafkaPublish) because they need to receive implicit parameters. Okay that's fine. I'd just remove the unnecessary parenthesis :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment