Skip to content

Instantly share code, notes, and snippets.

@emrul
Forked from cyberdelia/Consumer.kt
Created January 31, 2022 23:25
Show Gist options
  • Save emrul/6e43ddd4db948dd943e792e207740df4 to your computer and use it in GitHub Desktop.
Save emrul/6e43ddd4db948dd943e792e207740df4 to your computer and use it in GitHub Desktop.
Kafka + Kotlin + Coroutines
package com.lapanthere.bohemia
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
import java.time.Duration
fun <K, V> KafkaConsumer<K, V>.asFlow(timeout: Duration = Duration.ofMillis(500)): Flow<ConsumerRecord<K, V>> =
flow {
poll(timeout).forEach { emit(it) }
}
package com.lapanthere.bohemia
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Deferred
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.RecordMetadata
fun <K, V> KafkaProducer<K, V>.sendAsync(record: ProducerRecord<K, V>): Deferred<RecordMetadata> =
CompletableDeferred<RecordMetadata>().apply {
send(record) { metadata, exception ->
if (exception != null) {
completeExceptionally(exception)
} else {
complete(metadata)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment