Skip to content

Instantly share code, notes, and snippets.

@cyberdelia
Created October 21, 2020 23:29
  • Star 6 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
Star You must be signed in to star a gist
Embed
What would you like to do?
Kafka + Kotlin + Coroutines
package com.lapanthere.bohemia
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
import java.time.Duration
fun <K, V> KafkaConsumer<K, V>.asFlow(timeout: Duration = Duration.ofMillis(500)): Flow<ConsumerRecord<K, V>> =
flow {
poll(timeout).forEach { emit(it) }
}
package com.lapanthere.bohemia
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Deferred
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.RecordMetadata
fun <K, V> KafkaProducer<K, V>.sendAsync(record: ProducerRecord<K, V>): Deferred<RecordMetadata> =
CompletableDeferred<RecordMetadata>().apply {
send(record) { metadata, exception ->
if (exception != null) {
completeExceptionally(exception)
} else {
complete(metadata)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment