Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bijukunjummen/0588db50dafd1c5ad22aceeacd784edc to your computer and use it in GitHub Desktop.
Save bijukunjummen/0588db50dafd1c5ad22aceeacd784edc to your computer and use it in GitHub Desktop.
Flux.generate { sink: SynchronousSink<List<Message>> ->
    val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
        .queueUrl(queueUrl)
        .maxNumberOfMessages(5)
        .waitTimeSeconds(10)
        .build()
 
    val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages()
    sink.next(messages)
}
    .flatMapIterable(Function.identity())
    .retry()
    .flatMap({ (message: String, deleteHandle: () -> Unit) ->
        task(message)
            .then(Mono.fromSupplier { Try.of { deleteHandle() } })
            .then()
            .subscribeOn(taskScheduler)
    }, concurrency)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment