Created
July 27, 2018 21:56
-
-
Save joshm1/b6024f9c2715ff5a95e09bb393ee0c4b to your computer and use it in GitHub Desktop.
Playing with fs2, cats-effect, fs2-kafka
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.spaceiq.spaceplanning | |
import java.nio.channels.AsynchronousChannelGroup | |
import java.nio.charset.Charset | |
import java.time.{Instant, ZoneOffset} | |
import java.time.format.DateTimeFormatter | |
import java.util.Formatter.DateTime | |
import java.util.concurrent.{Executors, TimeUnit} | |
import cats.effect.{Effect, IO, Sync} | |
import fs2.StreamApp.ExitCode | |
import fs2.{Scheduler, Stream, StreamApp} | |
import org.http4s.server.blaze.BlazeBuilder | |
import scodec.bits.ByteVector | |
import scala.concurrent.ExecutionContext.Implicits.global | |
import spinoco.fs2.kafka | |
import spinoco.protocol.kafka._ | |
import scala.concurrent.duration._ | |
import spinoco.fs2.kafka | |
import spinoco.fs2.kafka.{Logger, _} | |
import spinoco.protocol.kafka._ | |
import scala.concurrent.duration._ | |
class KafkaLogger[F[_]](implicit F: Effect[F]) extends Logger[F] { | |
override def log(level: Logger.Level.Value, msg: => String, throwable: Throwable): F[Unit] = | |
F.delay { | |
println(s"LOGGER: $level: $msg") | |
if (throwable != null) throwable.printStackTrace() | |
} | |
} | |
// The only place where the Effect is defined. You could change it for `monix.eval.Task` for example. | |
object Server extends HttpServer[IO] | |
class HttpServer[F[_]](implicit F: Effect[F]) extends StreamApp[F]() { | |
// implicits required by fs2-kafka | |
lazy val executor = Executors.newSingleThreadExecutor() | |
implicit val group: AsynchronousChannelGroup = AsynchronousChannelGroup.withThreadPool(executor) | |
implicit val kafkaLogger: Logger[F] = new KafkaLogger[F]() | |
def kafkaClient()(implicit S: fs2.Scheduler): Stream[F, KafkaClient[F]] = kafka.client( | |
ensemble = Set(broker("kafka1", port = 9092)), | |
protocol = ProtocolVersion.Kafka_0_10_2, | |
clientName = "my-client-name" | |
) | |
def withKafkaClient(cb: KafkaClient[F] => Stream[F, ExitCode])(implicit S: fs2.Scheduler): Stream[F, ExitCode] = | |
kafkaClient().flatMap(cb) | |
private val ctx = new Module[F] | |
override def stream(args: List[String], requestShutdown: F[Unit]): Stream[F, ExitCode] = | |
Scheduler(corePoolSize = 2).flatMap { implicit scheduler => | |
withKafkaClient { implicit kafkaClient: KafkaClient[F] => | |
blazeStream() | |
.concurrently(kafkaSubscribe()) | |
.concurrently(kafkaPublish()) | |
} | |
} | |
def blazeStream(): Stream[F, ExitCode] = | |
BlazeBuilder[F] | |
.bindHttp(8080, "0.0.0.0") | |
.mountService(ctx.userHttpEndpoint, "/users") // You can mount as many services as you want | |
.serve | |
def kafkaPublish()(implicit S: fs2.Scheduler, kc: KafkaClient[F]): Stream[F, ExitCode] = | |
Stream | |
.eval(Sync[F].delay(println("publish every 5 second"))) | |
.flatMap[String] { _ => | |
S.awakeEvery(5.second).map[String] { _ => | |
DateTimeFormatter.ISO_DATE_TIME.withZone(ZoneOffset.UTC).format(Instant.now()) | |
} | |
} | |
.flatMap[Unit] { msg => | |
Stream | |
.eval { | |
val msgVector = ByteVector(msg.getBytes(Charset.defaultCharset())) | |
val key = ByteVector("testing".getBytes) | |
kc.publish1(topic("topic-A"), partition(0), key, msgVector, requireQuorum = true, 2.second) | |
} | |
.flatMap { l: Long => | |
Stream.eval(Sync[F].delay(printf("published %s => %d\n", msg, l))) | |
} | |
} | |
.map(_ => ExitCode.Success) | |
def kafkaSubscribe()(implicit kc: KafkaClient[F]): Stream[F, ExitCode] = | |
Stream.eval(Sync[F].delay(println("subscribing..."))) >> | |
kc.subscribe(topic("topic-A"), partition(0), HeadOffset) | |
.evalMap[Unit] { topicMessage => | |
Sync[F].delay { | |
printf( | |
"subscribed %s => %s\n", | |
new String(topicMessage.key.toArray), | |
new String(topicMessage.message.toArray) | |
) | |
} | |
} | |
.map(_ => ExitCode.Success) | |
} |
Thanks @gvolpe, I Appreciate the nitpicks!
-
withKafkaClient
is so I can use an implicit forKafaClient
and it's in scope. For this example it'd be easy to pass the object to thekafkaPublish
/kafkaSubscribe
methods, but future/real code will probably have publish/subscribe in more places. -
I should be able to figure this one out, but no luck so far. Anywhere I use
F.delay
I can't call methods on the return object.value *> is not a member of type parameter F[Unit]
- My point WRT
withKafkaClient
is that all it is doing is justkafkaClient().flatMap(cb)
and you could do just that in the only place it's being used. So it would be (also including thedef
forval
changes):
override def stream(args: List[String], requestShutdown: F[Unit]): Stream[F, ExitCode] =
Scheduler(corePoolSize = 2).flatMap { implicit scheduler =>
kafkaClient.flatMap { implicit client =>
blazeStream
.concurrently(kafkaSubscribe)
.concurrently(kafkaPublish)
}
}
- In order to use
*>
(right shark operator) you needimport cats.syntax.apply._
in scope.
Another thing I didn't see before is on line 76 when using Instant.now()
. This method is not RT, I'd wrap that in F
. Eg.;
S.awakeEvery(5.second).evalMap { _ =>
F.delay { DateTimeFormatter.ISO_DATE_TIME.withZone(ZoneOffset.UTC).format(Instant.now()) }
}
EDIT: I'm just seeing you're using def
on these methods (eg. kafkaPublish
) because they need to receive implicit parameters. Okay that's fine. I'd just remove the unnecessary parenthesis :)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi @joshm1,
I'm not really familiar with
fs2-kafka
but the rest of the code looks okay to me. A few nitpicks / recommendations:F.delay
instead ofSync[F].delay
in all the cases where there's already animplicit F: Effect[F]
, remember thatEffect
also implementsSync
. For example on line 96.blazeStream
,kafkaSubscribe
,kafkaPublish
andkafkaClient
asval
. They are all referential transparent.concurrently
as you're doing now, both theKafka
subscriber and publisher will be terminated upon termination of the web server. You could usejoin
for a different behavior.Logger
functionality (line 30), many people would separate those two actions for clarity but since we are in Scala land, that's not really a problem. Eg.F.delay { println(s"LOGGER: $level: $msg") } *> F.delay(throwable.printStackTrace()).attempt.void
.withKafkaClient
brings in any benefit.FP FTW!
Cheers :)