Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Kafka + Kotlin + Coroutines
package com.lapanthere.bohemia
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
import java.time.Duration
fun <K, V> KafkaConsumer<K, V>.asFlow(timeout: Duration = Duration.ofMillis(500)): Flow<ConsumerRecord<K, V>> =
flow {
poll(timeout).forEach { emit(it) }
}
package com.lapanthere.bohemia
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Deferred
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.RecordMetadata
fun <K, V> KafkaProducer<K, V>.sendAsync(record: ProducerRecord<K, V>): Deferred<RecordMetadata> =
CompletableDeferred<RecordMetadata>().apply {
send(record) { metadata, exception ->
if (exception != null) {
completeExceptionally(exception)
} else {
complete(metadata)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment