- Kotlinを使って単純なテキストを受け取って送信するKafka Producerを実装する
type of project: application
implementation langugae: Kotlin
Kotlin 1.5.30で動作確認。
Kafka Clientsを追加する。
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
implementation("org.apache.kafka:kafka-clients:2.8.0")
java.util.Properties
に各種値を設定する。
設定できる値の詳細は Producer Configurations を参照。
fun createProducer(): Producer<String, String> {
val props = Properties()
props["bootstrap.servers"] = "localhost:9092"
props["acks"] = "all"
props["retries"] = 0
props["linger.ms"] = 1
props["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
props["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
return KafkaProducer(props)
}
ProducerRecord
オブジェクトを作成して KafkaProducer.send(ProducerRecord)
に渡すだけでよい。
fun sendRecord(producer: Producer<String, String>, topic: String) {
val time = LocalDateTime.now()
// Create a record
val record: ProducerRecord<String, String> = ProducerRecord(
topic, // Topic
time.toString(), // Key
"Record sent at $time" // Value
)
producer.send(record)
}
今回は単純に各関数を呼び出しているだけだが、メッセージを送る前に終了することを防ぐためにスリープを入れている。
fun main() {
val producer = createProducer()
sendRecord(producer, "quickstart-events")
Thread.sleep(100)
}
package kmtr.simple.consumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerRecord
import java.time.LocalDateTime
import java.util.*
fun main() {
val producer = createProducer()
sendRecord(producer, "quickstart-events")
Thread.sleep(100)
}
fun createProducer(): Producer<String, String> {
val props = Properties()
props["bootstrap.servers"] = "localhost:9092"
props["acks"] = "all"
props["retries"] = 0
props["linger.ms"] = 1
props["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
props["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
return KafkaProducer(props)
}
fun sendRecord(producer: Producer<String, String>, topic: String) {
val time = LocalDateTime.now()
// Create a record
val record: ProducerRecord<String, String> = ProducerRecord(
topic, // Topic
time.toString(), // Key
"Record sent at $time" // Value
)
producer.send(record)
}
コードについてはこのサイトを元にしている。