Skip to content

Instantly share code, notes, and snippets.

@bphenriques
Last active January 13, 2023 12:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bphenriques/a8c175a6b9882e817123f28fbbd41d39 to your computer and use it in GitHub Desktop.
Save bphenriques/a8c175a6b9882e817123f28fbbd41d39 to your computer and use it in GitHub Desktop.
Create Kafka consumer properties file
#!/usr/bin/env -S scala-cli shebang --scala-version 2
//> using repository "https://packages.confluent.io/maven/"
//> using repository "jitpack"
//> using repository "https://kaluza.jfrog.io/artifactory/maven/"
//> using lib "is.cir::ciris::3.0.0"
//> using lib "com.ovoenergy::ciris-aiven-kafka::3.0.0"
//> using lib "org.typelevel::cats-core:2.9.0"
//> using lib "org.typelevel::cats-effect:3.4.4"
//> using lib "co.fs2::fs2-io:3.4.0"
import cats.effect.{IO, IOApp}
import ciris.aiven.kafka.aivenKafkaSetup
import ciris.file
import fs2.io.file.{Files, Path}
import java.nio.file.{Path => JPath}
// Requirements: scala-cli and the following files in the current directory: service.key, service.cert and ca.pem
// Usage: scala-cli --scala 2 kafka-config.scala > consumer.properties
// or chmod +x kafka-config.scala && ./kafka-config.scala > consumer.properties
object KafkaUtil extends IOApp.Simple {
def moveToTmp(src: Path): IO[Path] = for {
location <- Files[IO].createTempFile.flatTap(Files[IO].delete)
_ <- Files[IO].copy(src, location)
} yield location
override def run: IO[Unit] =
aivenKafkaSetup(
clientPrivateKey = file(JPath.of("service.key")).redacted,
clientCertificate = file(JPath.of("service.cert")),
serviceCertificate = file(JPath.of("ca.pem")),
).evalMap { c =>
for {
keyStoreLocation <- moveToTmp(Path(c.keyStoreFile.path.toString))
trustStoreLocation <- moveToTmp(Path(c.trustStoreFile.path.toString))
// aivenKafkaSetup uses temporary locations and we need the files to be available after running the application
updatedProperties =
c.properties ++ Map(
"ssl.truststore.location" -> trustStoreLocation.absolute.toString,
"ssl.keystore.location" -> keyStoreLocation.absolute.toString
)
_ <- IO.println(updatedProperties.map { case (key, value) => s"$key=$value" }.mkString("\n"))
} yield ()
}.load[IO]
.void
}
@bphenriques
Copy link
Author

bphenriques commented Aug 3, 2022

Usage:

$ wget https://dlcdn.apache.org/kafka/3.2.1/kafka_2.13-3.2.1.tgz
$ tar -xzf kafka_2.13-3.2.1.tgz
$ tree -L 1
.
├── ca.pem # downloaded from Aiven Console
├── kafka_2.13-3.2.1
├── kafka_2.13-3.2.1.tgz
├── service.cert # downloaded from Aiven Console
└── service.key # exported from the previous command

$ scala-cli https://gist.github.com/bphenriques/a8c175a6b9882e817123f28fbbd41d39 --scala 2 > consumer.properties
$ kafka_2.13-3.2.1/bin/kafka-topics.sh --list --bootstrap-server <bootstrap-server-address> --command-config consumer.properties
topic1
topic2
...
$ kafka_2.13-3.2.1/bin/kafka-consumer-groups.sh \
  --bootstrap-server <bootstrap-server-address> \
  --command-config consumer.properties \
  --group <consumer-group-id> \
  --topic <topic-name> \
  --reset-offsets \
  --to-earliest \
  --dry-run

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment