Created
July 27, 2018 21:56
-
-
Save joshm1/b6024f9c2715ff5a95e09bb393ee0c4b to your computer and use it in GitHub Desktop.
Playing with fs2, cats-effect, fs2-kafka
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.spaceiq.spaceplanning | |
import java.nio.channels.AsynchronousChannelGroup | |
import java.nio.charset.Charset | |
import java.time.{Instant, ZoneOffset} | |
import java.time.format.DateTimeFormatter | |
import java.util.Formatter.DateTime | |
import java.util.concurrent.{Executors, TimeUnit} | |
import cats.effect.{Effect, IO, Sync} | |
import fs2.StreamApp.ExitCode | |
import fs2.{Scheduler, Stream, StreamApp} | |
import org.http4s.server.blaze.BlazeBuilder | |
import scodec.bits.ByteVector | |
import scala.concurrent.ExecutionContext.Implicits.global | |
import spinoco.fs2.kafka | |
import spinoco.protocol.kafka._ | |
import scala.concurrent.duration._ | |
import spinoco.fs2.kafka | |
import spinoco.fs2.kafka.{Logger, _} | |
import spinoco.protocol.kafka._ | |
import scala.concurrent.duration._ | |
class KafkaLogger[F[_]](implicit F: Effect[F]) extends Logger[F] { | |
override def log(level: Logger.Level.Value, msg: => String, throwable: Throwable): F[Unit] = | |
F.delay { | |
println(s"LOGGER: $level: $msg") | |
if (throwable != null) throwable.printStackTrace() | |
} | |
} | |
// The only place where the Effect is defined. You could change it for `monix.eval.Task` for example. | |
object Server extends HttpServer[IO] | |
class HttpServer[F[_]](implicit F: Effect[F]) extends StreamApp[F]() { | |
// implicits required by fs2-kafka | |
lazy val executor = Executors.newSingleThreadExecutor() | |
implicit val group: AsynchronousChannelGroup = AsynchronousChannelGroup.withThreadPool(executor) | |
implicit val kafkaLogger: Logger[F] = new KafkaLogger[F]() | |
def kafkaClient()(implicit S: fs2.Scheduler): Stream[F, KafkaClient[F]] = kafka.client( | |
ensemble = Set(broker("kafka1", port = 9092)), | |
protocol = ProtocolVersion.Kafka_0_10_2, | |
clientName = "my-client-name" | |
) | |
def withKafkaClient(cb: KafkaClient[F] => Stream[F, ExitCode])(implicit S: fs2.Scheduler): Stream[F, ExitCode] = | |
kafkaClient().flatMap(cb) | |
private val ctx = new Module[F] | |
override def stream(args: List[String], requestShutdown: F[Unit]): Stream[F, ExitCode] = | |
Scheduler(corePoolSize = 2).flatMap { implicit scheduler => | |
withKafkaClient { implicit kafkaClient: KafkaClient[F] => | |
blazeStream() | |
.concurrently(kafkaSubscribe()) | |
.concurrently(kafkaPublish()) | |
} | |
} | |
def blazeStream(): Stream[F, ExitCode] = | |
BlazeBuilder[F] | |
.bindHttp(8080, "0.0.0.0") | |
.mountService(ctx.userHttpEndpoint, "/users") // You can mount as many services as you want | |
.serve | |
def kafkaPublish()(implicit S: fs2.Scheduler, kc: KafkaClient[F]): Stream[F, ExitCode] = | |
Stream | |
.eval(Sync[F].delay(println("publish every 5 second"))) | |
.flatMap[String] { _ => | |
S.awakeEvery(5.second).map[String] { _ => | |
DateTimeFormatter.ISO_DATE_TIME.withZone(ZoneOffset.UTC).format(Instant.now()) | |
} | |
} | |
.flatMap[Unit] { msg => | |
Stream | |
.eval { | |
val msgVector = ByteVector(msg.getBytes(Charset.defaultCharset())) | |
val key = ByteVector("testing".getBytes) | |
kc.publish1(topic("topic-A"), partition(0), key, msgVector, requireQuorum = true, 2.second) | |
} | |
.flatMap { l: Long => | |
Stream.eval(Sync[F].delay(printf("published %s => %d\n", msg, l))) | |
} | |
} | |
.map(_ => ExitCode.Success) | |
def kafkaSubscribe()(implicit kc: KafkaClient[F]): Stream[F, ExitCode] = | |
Stream.eval(Sync[F].delay(println("subscribing..."))) >> | |
kc.subscribe(topic("topic-A"), partition(0), HeadOffset) | |
.evalMap[Unit] { topicMessage => | |
Sync[F].delay { | |
printf( | |
"subscribed %s => %s\n", | |
new String(topicMessage.key.toArray), | |
new String(topicMessage.message.toArray) | |
) | |
} | |
} | |
.map(_ => ExitCode.Success) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
withKafkaClient
is that all it is doing is justkafkaClient().flatMap(cb)
and you could do just that in the only place it's being used. So it would be (also including thedef
forval
changes):*>
(right shark operator) you needimport cats.syntax.apply._
in scope.Another thing I didn't see before is on line 76 when using
Instant.now()
. This method is not RT, I'd wrap that inF
. Eg.;EDIT: I'm just seeing you're using
def
on these methods (eg.kafkaPublish
) because they need to receive implicit parameters. Okay that's fine. I'd just remove the unnecessary parenthesis :)