Skip to content

Instantly share code, notes, and snippets.

@KadekM
Last active May 10, 2021 20:12
Show Gist options
  • Save KadekM/304a07f78bd977bfa828dd966e74701a to your computer and use it in GitHub Desktop.
Save KadekM/304a07f78bd977bfa828dd966e74701a to your computer and use it in GitHub Desktop.
def partitionToString(topic: TopicPartition): String =
s"${topic.topic()}-${topic.partition()}"
override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = {
val zio = for {
session <- ZIO.service[SessionTask]
_ <- ZIO.runtime[Any].flatMap { implicit r: Runtime[Any] =>
val manualOffsetRetrieval = Consumer.OffsetRetrieval.Manual { partitions =>
val list = partitions.map(partitionToString).toList
val query = Sql.selectPartitionSql(list)
session
.prepare(query).toManagedZIO
.use(_.stream(list, 64).compile.toVector)
.map(xs => xs.collect { case Some(t) => t }
.map(x => x.topic -> x.offset)
.toMap)
}
val consumerSettings = ConsumerSettings(List("localhost:9092"))
.withGroupId("my.group")
.withOffsetRetrieval(manualOffsetRetrieval)
val consumerM = Consumer.make(consumerSettings)
??? // to be continued
}
} yield ()
??? // to be continued
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment