Skip to content

Instantly share code, notes, and snippets.

@valdo404
Last active November 2, 2021 11:34
Show Gist options
  • Save valdo404/f68889251b38c41d20cd0b824f91435a to your computer and use it in GitHub Desktop.
Save valdo404/f68889251b38c41d20cd0b824f91435a to your computer and use it in GitHub Desktop.
import cats.effect.{ExitCode, IO, IOApp, Resource}
import com.sksamuel.pulsar4s.{Consumer, ConsumerConfig, ConsumerMessage, Producer, ProducerConfig, PulsarClient, PulsarClientConfig, Subscription, Topic}
import com.sksamuel.pulsar4s.fs2.{CommittableMessage, PulsarStreams}
object PulsarTest extends IOApp {
import com.sksamuel.pulsar4s.circe._
import io.circe.generic.auto._
val intopic = Topic("persistent://sample/standalone/ns1/in")
val outtopic = Topic("persistent://sample/standalone/ns1/out")
import com.sksamuel.pulsar4s.cats.CatsAsyncHandler._
import cats.implicits._
val config = PulsarClientConfig("pulsar://localhost:6650")
case class Toto(id: String)
override def run(args: List[String]): IO[ExitCode] = publishToPulsar().as(ExitCode.Success)
case class ConsumerProducer[T](consumer: IO[Consumer[T]], producer: IO[Producer[T]])
def publishToPulsar(): IO[Unit] = {
pulsarClient.use(consumerProducer =>
for {
producer <- consumerProducer.producer
sendOp = fs2.Stream.repeatEval(producer.sendAsync(Toto("pizza"))).compile.drain
streamOp = subscribeToPulsar(consumerProducer.consumer)
.evalTap(message => IO(println(message.data.value, message.data.publishTime, message.data.eventTime)))
.evalTap(commitable => commitable.ack)
.compile.drain
evaluation <- (sendOp, streamOp).parMapN((_, _) => ())
} yield evaluation
).as(())
}
def pulsarClient: Resource[IO, ConsumerProducer[Toto]] = {
for {
client <- Resource.make(IO(PulsarClient(config)))(client => client.closeAsync)
producer = IO(client.producer[Toto](ProducerConfig(intopic)))
consumer = IO(client.consumer[Toto](ConsumerConfig(Subscription("my-pulsar-subscription"), Seq(intopic))))
} yield ConsumerProducer(consumer, producer)
}
def subscribeToPulsar(consumer: IO[Consumer[Toto]]): fs2.Stream[IO, CommittableMessage[IO, ConsumerMessage[Toto]]] = {
PulsarStreams.batch[IO, Toto](consumer)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment