Skip to content

Instantly share code, notes, and snippets.

@carlosedp
Last active April 13, 2023 13:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save carlosedp/b863ed932cf2be5917b6a34ccc7989cd to your computer and use it in GitHub Desktop.
Save carlosedp/b863ed932cf2be5917b6a34ccc7989cd to your computer and use it in GitHub Desktop.
ZIO Kafka sample app
# Run with docker-compose up -d
version: "2"
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- 9021:8080
depends_on:
- kafka
environment:
DYNAMIC_CONFIG_ENABLED: true
KAFKA_CLUSTERS_0_NAME: ComposeCluster
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
// Run with scala-cli producerconsumer.scala
//> using scala "3.3.0-RC3"
//> using lib "dev.zio::zio:2.0.12"
//> using lib "dev.zio::zio-kafka:2.2"
//> using lib "dev.zio::zio-logging:2.1.12"
//> using lib "dev.zio::zio-logging-slf4j2-bridge:2.1.12"
//> using option "-source:future"
import zio.*
import zio.kafka.consumer.*
import zio.kafka.producer.{Producer, ProducerSettings}
import zio.kafka.serde.*
import zio.logging.*
import zio.logging.slf4j.bridge.Slf4jBridge
import zio.stream.ZStream
object MainApp extends ZIOAppDefault:
val producer: ZStream[Producer, Throwable, Nothing] =
ZStream
.repeatZIO(Random.nextIntBetween(0, Int.MaxValue))
.schedule(Schedule.fixed(2.seconds))
.mapZIO: random =>
Producer.produce[Any, Long, String](
topic = "random",
key = random % 4,
value = random.toString,
keySerializer = Serde.long,
valueSerializer = Serde.string,
).tap(r => Console.printLine(s"Produced value \"$random\" with offset ${r.offset}"))
.drain
val consumer: ZStream[Consumer, Throwable, Nothing] =
Consumer
.plainStream(Subscription.topics("random"), Serde.long, Serde.string)
.tap(r => Console.printLine(s"Consumed key: ${r.key}, value: ${r.value}"))
.map(_.offset)
.aggregateAsync(Consumer.offsetBatches)
.mapZIO(_.commit)
.drain
def producerLayer =
ZLayer.scoped(
Producer.make(
settings = ProducerSettings(List("localhost:29092")),
),
)
def consumerLayer =
ZLayer.scoped(
Consumer.make(
ConsumerSettings(List("localhost:29092")).withGroupId("group"),
),
)
val logFilter: LogFilter[String] = LogFilter.logLevelByName(
LogLevel.Debug,
"SLF4J-LOGGER" -> LogLevel.Warning,
"org.apache.kafka" -> LogLevel.Warning,
)
override val bootstrap: ZLayer[ZIOAppArgs, Any, Any] =
Runtime.removeDefaultLoggers >>> consoleLogger(
ConsoleLoggerConfig(LogFormat.colored, logFilter),
)
// Both producer and consumer (doesn't interrupt with ctrl-c because of the consumer)
// override def run =
// producer
// .merge(consumer)
// .runDrain
// .provide(
// producerLayer,
// consumerLayer,
// Slf4jBridge.initialize,
// )
// Consumer only (which doesn't interrupt with ctrl-c)
override def run =
consumer
.runDrain
.provide(
consumerLayer,
Slf4jBridge.initialize,
)
// // Producer only (interrupts fine with ctrl-c)
// override def run =
// producer
// .runDrain
// .provide(
// producerLayer,
// Slf4jBridge.initialize,
// )
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment