Skip to content

Instantly share code, notes, and snippets.

@kmtr
Last active September 9, 2021 23:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kmtr/f32245d8b26e5eaec11797aed0a4561f to your computer and use it in GitHub Desktop.
Save kmtr/f32245d8b26e5eaec11797aed0a4561f to your computer and use it in GitHub Desktop.

Kafka Consumerを実装する

この文書で達成されること

  • Kotlinを使って単純なテキストを受け取って出力するKafka Consumerを実装する

ビルド環境

Gradle

type of project: application implementation langugae: Kotlin

Kotlin

Kotlin 1.5.30で動作確認。

dependencies

Kafka Clientsを追加する。

// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
implementation("org.apache.kafka:kafka-clients:2.8.0")

実装

Consumerインスタンスの生成

java.util.Properties に各種値を設定する。
設定できる値の詳細は Consumer Configurations を参照。

fun createConsumer(): Consumer<String, String> {
    val props = Properties()
    props["bootstrap.servers"] = "localhost:9092"
    props["group.id"] = "test"
    props["enable.auto.commit"] = "true"
    props["auto.commit.interval.ms"] = "1000"
    props["key.deserializer"] = "org.apache.kafka.common.serialization.StringDeserializer"
    props["value.deserializer"] = "org.apache.kafka.common.serialization.StringDeserializer"
    return KafkaConsumer(props)
}

メッセージの受信(consume)

受信対象となるtopicを引数で取るようにする。

Consumer.subscribe(Collection)Collection を引数とするため、複数のtopicを受信することができるが、ここでは引数として渡ってきた一つだけを受信するため、listOf で要素が一つだけのリストをつくって、このメソッドに渡している。

今回は while による無限ループの中で出力処理を行なっている。同一スレッドで処理を行っているので、仮に println が非常に重い処理であった場合、subscribe頻度が遅くなるため、別スレッドにする工夫が必要だろう。

fun consumeRecords(consumer: Consumer<String, String>, topic: String) {
    consumer.subscribe(listOf(topic))
    while (true) {
        val records: ConsumerRecords<String, String> = consumer.poll(Duration.ofSeconds(5))
        for (record: ConsumerRecord<String, String> in records) {
            println("Consumer reading record: ${record.value()}")
        }
        consumer.commitAsync()
    }
}

main

consumeRecords は別のスレッドで動かす。 また安全に consumer.close() を呼び出すために、use を使う。

fun main() {
    val consumer = createConsumer()
    thread {
        consumer.use { consumer ->
            consumeRecords(consumer, "quickstart-events")
        }
    }
}

ここで使ったKotlinの機能

thread

簡単にスレッドオブジェクトを作り、実行する。 threadはあくまで関数なので、戻り値として Thread オブジェクトを返す。

val t = thread { /* snip */ }

https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.concurrent/thread.html

use

Javaの try-with-resources を実現する。

https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.io/use.html

コード全景

package kmtr.simple.consumer

import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import java.time.Duration
import java.util.*
import kotlin.concurrent.thread

fun main() {
    val consumer = createConsumer()
    thread {
        consumer.use { consumer ->
            consumeRecords(consumer, "quickstart-events")
        }
    }
}

fun createConsumer(): Consumer<String, String> {
    val props = Properties()
    props["bootstrap.servers"] = "localhost:9092"
    props["group.id"] = "test"
    props["enable.auto.commit"] = "true"
    props["auto.commit.interval.ms"] = "1000"
    props["key.deserializer"] = "org.apache.kafka.common.serialization.StringDeserializer"
    props["value.deserializer"] = "org.apache.kafka.common.serialization.StringDeserializer"
    return KafkaConsumer(props)
}

fun consumeRecords(consumer: Consumer<String, String>, topic: String) {
    consumer.subscribe(listOf(topic))
    while (true) {
        val records: ConsumerRecords<String, String> = consumer.poll(Duration.ofSeconds(5))
        for (record: ConsumerRecord<String, String> in records) {
            println("Consumer reading record: ${record.value()}")
        }
        consumer.commitAsync()
    }
}

参考文献

コードについてはこのサイトを元にしている。

https://lankydan.dev/intro-to-kafka-consumers

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment