Skip to content

Instantly share code, notes, and snippets.

@cyberdelia
Created October 21, 2020 23:29
Show Gist options
  • Save cyberdelia/1b9dfd259a64c0c3c50455587f79d41c to your computer and use it in GitHub Desktop.
Save cyberdelia/1b9dfd259a64c0c3c50455587f79d41c to your computer and use it in GitHub Desktop.
Kafka + Kotlin + Coroutines
package com.lapanthere.bohemia
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
import java.time.Duration
fun <K, V> KafkaConsumer<K, V>.asFlow(timeout: Duration = Duration.ofMillis(500)): Flow<ConsumerRecord<K, V>> =
flow {
poll(timeout).forEach { emit(it) }
}
package com.lapanthere.bohemia
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Deferred
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.RecordMetadata
fun <K, V> KafkaProducer<K, V>.sendAsync(record: ProducerRecord<K, V>): Deferred<RecordMetadata> =
CompletableDeferred<RecordMetadata>().apply {
send(record) { metadata, exception ->
if (exception != null) {
completeExceptionally(exception)
} else {
complete(metadata)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment