Skip to content

Instantly share code, notes, and snippets.

View slowconsumerflatmap.kt
producer.produce(producerRate, count)
    .subscribeOn(subscribeOnScheduler)
    .publishOn(publishOnScheduler, 10)
    .flatMap({ value: Long ->
        Mono.fromSupplier {
            sleep(delayBetweenConsumes)
            logger.info("Consumed {}", value)
            null
        }.subscribeOn(flatMapScheduler)
    }, concurrency)
View slowconsumerflatmap.kt
producer.produce(producerRate, count)
    .subscribeOn(subscribeOnScheduler)
    .publishOn(publishOnScheduler, 10)
    .flatMap({ value: Long ->
        Mono.fromSupplier {
            sleep(delayBetweenConsumes)
            logger.info("Consumed {}", value)
            null
        }.subscribeOn(flatMapScheduler)
    }, concurrency)
View slowconsumerbuffered.kt
producer.produce(producerRate, count)
    .subscribeOn(subscribeOnScheduler)
    .publishOn(publishOnScheduler, 10)
    .subscribe { value: Long ->
        sleep(delayBetweenConsumes)
        logger.info("Consumed {}", value)
    }
View slowconsumerthreading.kt
producer.produce(producerRate, count)
    .subscribeOn(subscribeOnScheduler)
    .publishOn(publishOnScheduler)
    .subscribe { value: Long ->
        sleep(delayBetweenConsumes)
        logger.info("Consumed {}", value)
    }
View simpleconsumer.kt
val delayBetweenConsumes: Long = 1000L / consumerRate
producer.produce(producerRate, count)
    .subscribe { value: Long ->
        sleep(delayBetweenConsumes)
        logger.info("Consumed {}", value)
    }
View producer.kt
fun produce(targetRate: Int, upto: Long): Flux<Long> {
    val delayBetweenEmits: Long = 1000L / targetRate
 
    return Flux.generate(
        { 1L },
        { state: Long, sink: SynchronousSink<Long> ->
            sleep(delayBetweenEmits)
            val nextState: Long = state + 1
            if (state > upto) {
                sink.complete()
View hotel.kt
http :9080/hotels id=4 name=name address=address zip=zip state=OR
View HandlingDownStreamErrors.kt
.flatMap({ (message: String, deleteHandle: () -> Unit) ->
task(message)
.then(Mono.fromSupplier { Try.of { deleteHandle() } })
.doOnNext { t ->
t.onFailure { e -> LOGGER.error(e.message, e) }
}
.then()
.subscribeOn(taskScheduler)
}, concurrency)
View ProcessInParallelWithFlatMap.kt
Flux.generate { sink: SynchronousSink<List<Message>> ->
    val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
        .queueUrl(queueUrl)
        .maxNumberOfMessages(5)
        .waitTimeSeconds(10)
        .build()
 
    val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages()
    sink.next(messages)
}
View ProcessInParallel.kt
Flux.generate { sink: SynchronousSink<List<Message>> ->
    val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
        .queueUrl(queueUrl)
        .maxNumberOfMessages(5)
        .waitTimeSeconds(10)
        .build()
 
    val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages()
    sink.next(messages)
}
You can’t perform that action at this time.