Skip to content

Instantly share code, notes, and snippets.

@kmtr
Created September 9, 2021 23:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kmtr/3577cb404e1daec235725ef69b131cb4 to your computer and use it in GitHub Desktop.
Save kmtr/3577cb404e1daec235725ef69b131cb4 to your computer and use it in GitHub Desktop.

Kafka Producerを実装する

この文書で達成されること

  • Kotlinを使って単純なテキストを受け取って送信するKafka Producerを実装する

ビルド環境

Gradle

type of project: application implementation langugae: Kotlin

Kotlin

Kotlin 1.5.30で動作確認。

dependencies

Kafka Clientsを追加する。

// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
implementation("org.apache.kafka:kafka-clients:2.8.0")

実装

Producerインスタンスの作成

java.util.Properties に各種値を設定する。
設定できる値の詳細は Producer Configurations を参照。

fun createProducer(): Producer<String, String> {
    val props = Properties()
    props["bootstrap.servers"] = "localhost:9092"
    props["acks"] = "all"
    props["retries"] = 0
    props["linger.ms"] = 1
    props["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
    props["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
    return KafkaProducer(props)
}

メッセージの送信

ProducerRecord オブジェクトを作成して KafkaProducer.send(ProducerRecord) に渡すだけでよい。

fun sendRecord(producer: Producer<String, String>, topic: String) {
    val time = LocalDateTime.now()
    // Create a record
    val record: ProducerRecord<String, String> = ProducerRecord(
        topic, // Topic
        time.toString(), // Key
        "Record sent at $time" // Value
    )
    producer.send(record)
}

main

今回は単純に各関数を呼び出しているだけだが、メッセージを送る前に終了することを防ぐためにスリープを入れている。

fun main() {
    val producer = createProducer()
    sendRecord(producer, "quickstart-events")
    Thread.sleep(100)
}

コード全景

package kmtr.simple.consumer

import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerRecord
import java.time.LocalDateTime
import java.util.*

fun main() {
    val producer = createProducer()
    sendRecord(producer, "quickstart-events")
    Thread.sleep(100)
}

fun createProducer(): Producer<String, String> {
    val props = Properties()
    props["bootstrap.servers"] = "localhost:9092"
    props["acks"] = "all"
    props["retries"] = 0
    props["linger.ms"] = 1
    props["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
    props["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"

    return KafkaProducer(props)
}

fun sendRecord(producer: Producer<String, String>, topic: String) {
    val time = LocalDateTime.now()
    // Create a record
    val record: ProducerRecord<String, String> = ProducerRecord(
        topic, // Topic
        time.toString(), // Key
        "Record sent at $time" // Value
    )
    producer.send(record)
}

参考文献

コードについてはこのサイトを元にしている。

https://lankydan.dev/intro-to-kafka-producers

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment