Skip to content

Instantly share code, notes, and snippets.

@joshm1
Created July 27, 2018 21:56
Show Gist options
  • Save joshm1/b6024f9c2715ff5a95e09bb393ee0c4b to your computer and use it in GitHub Desktop.
Save joshm1/b6024f9c2715ff5a95e09bb393ee0c4b to your computer and use it in GitHub Desktop.
Playing with fs2, cats-effect, fs2-kafka
package com.spaceiq.spaceplanning
import java.nio.channels.AsynchronousChannelGroup
import java.nio.charset.Charset
import java.time.{Instant, ZoneOffset}
import java.time.format.DateTimeFormatter
import java.util.Formatter.DateTime
import java.util.concurrent.{Executors, TimeUnit}
import cats.effect.{Effect, IO, Sync}
import fs2.StreamApp.ExitCode
import fs2.{Scheduler, Stream, StreamApp}
import org.http4s.server.blaze.BlazeBuilder
import scodec.bits.ByteVector
import scala.concurrent.ExecutionContext.Implicits.global
import spinoco.fs2.kafka
import spinoco.protocol.kafka._
import scala.concurrent.duration._
import spinoco.fs2.kafka
import spinoco.fs2.kafka.{Logger, _}
import spinoco.protocol.kafka._
import scala.concurrent.duration._
class KafkaLogger[F[_]](implicit F: Effect[F]) extends Logger[F] {
override def log(level: Logger.Level.Value, msg: => String, throwable: Throwable): F[Unit] =
F.delay {
println(s"LOGGER: $level: $msg")
if (throwable != null) throwable.printStackTrace()
}
}
// The only place where the Effect is defined. You could change it for `monix.eval.Task` for example.
object Server extends HttpServer[IO]
class HttpServer[F[_]](implicit F: Effect[F]) extends StreamApp[F]() {
// implicits required by fs2-kafka
lazy val executor = Executors.newSingleThreadExecutor()
implicit val group: AsynchronousChannelGroup = AsynchronousChannelGroup.withThreadPool(executor)
implicit val kafkaLogger: Logger[F] = new KafkaLogger[F]()
def kafkaClient()(implicit S: fs2.Scheduler): Stream[F, KafkaClient[F]] = kafka.client(
ensemble = Set(broker("kafka1", port = 9092)),
protocol = ProtocolVersion.Kafka_0_10_2,
clientName = "my-client-name"
)
def withKafkaClient(cb: KafkaClient[F] => Stream[F, ExitCode])(implicit S: fs2.Scheduler): Stream[F, ExitCode] =
kafkaClient().flatMap(cb)
private val ctx = new Module[F]
override def stream(args: List[String], requestShutdown: F[Unit]): Stream[F, ExitCode] =
Scheduler(corePoolSize = 2).flatMap { implicit scheduler =>
withKafkaClient { implicit kafkaClient: KafkaClient[F] =>
blazeStream()
.concurrently(kafkaSubscribe())
.concurrently(kafkaPublish())
}
}
def blazeStream(): Stream[F, ExitCode] =
BlazeBuilder[F]
.bindHttp(8080, "0.0.0.0")
.mountService(ctx.userHttpEndpoint, "/users") // You can mount as many services as you want
.serve
def kafkaPublish()(implicit S: fs2.Scheduler, kc: KafkaClient[F]): Stream[F, ExitCode] =
Stream
.eval(Sync[F].delay(println("publish every 5 second")))
.flatMap[String] { _ =>
S.awakeEvery(5.second).map[String] { _ =>
DateTimeFormatter.ISO_DATE_TIME.withZone(ZoneOffset.UTC).format(Instant.now())
}
}
.flatMap[Unit] { msg =>
Stream
.eval {
val msgVector = ByteVector(msg.getBytes(Charset.defaultCharset()))
val key = ByteVector("testing".getBytes)
kc.publish1(topic("topic-A"), partition(0), key, msgVector, requireQuorum = true, 2.second)
}
.flatMap { l: Long =>
Stream.eval(Sync[F].delay(printf("published %s => %d\n", msg, l)))
}
}
.map(_ => ExitCode.Success)
def kafkaSubscribe()(implicit kc: KafkaClient[F]): Stream[F, ExitCode] =
Stream.eval(Sync[F].delay(println("subscribing..."))) >>
kc.subscribe(topic("topic-A"), partition(0), HeadOffset)
.evalMap[Unit] { topicMessage =>
Sync[F].delay {
printf(
"subscribed %s => %s\n",
new String(topicMessage.key.toArray),
new String(topicMessage.message.toArray)
)
}
}
.map(_ => ExitCode.Success)
}
@gvolpe
Copy link

gvolpe commented Jul 28, 2018

Hi @joshm1,

I'm not really familiar with fs2-kafka but the rest of the code looks okay to me. A few nitpicks / recommendations:

  • Prefer to use F.delay instead of Sync[F].delay in all the cases where there's already an implicit F: Effect[F], remember that Effect also implements Sync. For example on line 96.
  • I would define blazeStream, kafkaSubscribe, kafkaPublish and kafkaClient as val. They are all referential transparent.
  • Be aware that by using concurrently as you're doing now, both the Kafka subscriber and publisher will be terminated upon termination of the web server. You could use join for a different behavior.
  • Regarding the Logger functionality (line 30), many people would separate those two actions for clarity but since we are in Scala land, that's not really a problem. Eg. F.delay { println(s"LOGGER: $level: $msg") } *> F.delay(throwable.printStackTrace()).attempt.void.
  • Not sure whether the method withKafkaClient brings in any benefit.

FP FTW!

Cheers :)

@joshm1
Copy link
Author

joshm1 commented Jul 28, 2018

Thanks @gvolpe, I Appreciate the nitpicks!

  • withKafkaClient is so I can use an implicit for KafaClient and it's in scope. For this example it'd be easy to pass the object to the kafkaPublish/kafkaSubscribe methods, but future/real code will probably have publish/subscribe in more places.

  • I should be able to figure this one out, but no luck so far. Anywhere I use F.delay I can't call methods on the return object.

      value *> is not a member of type parameter F[Unit]
    

@gvolpe
Copy link

gvolpe commented Jul 29, 2018

  • My point WRT withKafkaClient is that all it is doing is just kafkaClient().flatMap(cb) and you could do just that in the only place it's being used. So it would be (also including the def for val changes):
override def stream(args: List[String], requestShutdown: F[Unit]): Stream[F, ExitCode] =
  Scheduler(corePoolSize = 2).flatMap { implicit scheduler =>
    kafkaClient.flatMap { implicit client =>
      blazeStream
        .concurrently(kafkaSubscribe)
        .concurrently(kafkaPublish)
    }
  }
  • In order to use *> (right shark operator) you need import cats.syntax.apply._ in scope.

Another thing I didn't see before is on line 76 when using Instant.now(). This method is not RT, I'd wrap that in F. Eg.;

S.awakeEvery(5.second).evalMap { _ =>
  F.delay { DateTimeFormatter.ISO_DATE_TIME.withZone(ZoneOffset.UTC).format(Instant.now()) }
}

EDIT: I'm just seeing you're using def on these methods (eg. kafkaPublish) because they need to receive implicit parameters. Okay that's fine. I'd just remove the unnecessary parenthesis :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment