- Kotlinを使って単純なテキストを受け取って出力するKafka Consumerを実装する
type of project: application
implementation langugae: Kotlin
Kotlin 1.5.30で動作確認。
Kafka Clientsを追加する。
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
implementation("org.apache.kafka:kafka-clients:2.8.0")
java.util.Properties
に各種値を設定する。
設定できる値の詳細は Consumer Configurations を参照。
fun createConsumer(): Consumer<String, String> {
val props = Properties()
props["bootstrap.servers"] = "localhost:9092"
props["group.id"] = "test"
props["enable.auto.commit"] = "true"
props["auto.commit.interval.ms"] = "1000"
props["key.deserializer"] = "org.apache.kafka.common.serialization.StringDeserializer"
props["value.deserializer"] = "org.apache.kafka.common.serialization.StringDeserializer"
return KafkaConsumer(props)
}
受信対象となるtopicを引数で取るようにする。
Consumer.subscribe(Collection)
は Collection
を引数とするため、複数のtopicを受信することができるが、ここでは引数として渡ってきた一つだけを受信するため、listOf
で要素が一つだけのリストをつくって、このメソッドに渡している。
今回は while
による無限ループの中で出力処理を行なっている。同一スレッドで処理を行っているので、仮に println
が非常に重い処理であった場合、subscribe頻度が遅くなるため、別スレッドにする工夫が必要だろう。
fun consumeRecords(consumer: Consumer<String, String>, topic: String) {
consumer.subscribe(listOf(topic))
while (true) {
val records: ConsumerRecords<String, String> = consumer.poll(Duration.ofSeconds(5))
for (record: ConsumerRecord<String, String> in records) {
println("Consumer reading record: ${record.value()}")
}
consumer.commitAsync()
}
}
consumeRecords
は別のスレッドで動かす。
また安全に consumer.close()
を呼び出すために、use
を使う。
fun main() {
val consumer = createConsumer()
thread {
consumer.use { consumer ->
consumeRecords(consumer, "quickstart-events")
}
}
}
簡単にスレッドオブジェクトを作り、実行する。
threadはあくまで関数なので、戻り値として Thread
オブジェクトを返す。
val t = thread { /* snip */ }
https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.concurrent/thread.html
Javaの try-with-resources
を実現する。
https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.io/use.html
package kmtr.simple.consumer
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import java.time.Duration
import java.util.*
import kotlin.concurrent.thread
fun main() {
val consumer = createConsumer()
thread {
consumer.use { consumer ->
consumeRecords(consumer, "quickstart-events")
}
}
}
fun createConsumer(): Consumer<String, String> {
val props = Properties()
props["bootstrap.servers"] = "localhost:9092"
props["group.id"] = "test"
props["enable.auto.commit"] = "true"
props["auto.commit.interval.ms"] = "1000"
props["key.deserializer"] = "org.apache.kafka.common.serialization.StringDeserializer"
props["value.deserializer"] = "org.apache.kafka.common.serialization.StringDeserializer"
return KafkaConsumer(props)
}
fun consumeRecords(consumer: Consumer<String, String>, topic: String) {
consumer.subscribe(listOf(topic))
while (true) {
val records: ConsumerRecords<String, String> = consumer.poll(Duration.ofSeconds(5))
for (record: ConsumerRecord<String, String> in records) {
println("Consumer reading record: ${record.value()}")
}
consumer.commitAsync()
}
}
コードについてはこのサイトを元にしている。