Skip to content

Instantly share code, notes, and snippets.

View bijukunjummen's full-sized avatar

Biju Kunjummen bijukunjummen

View GitHub Profile
val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
    .queueUrl(queueUrl)
    .maxNumberOfMessages(5)
    .waitTimeSeconds(10)
    .build()
 
val messages: CompletableFuture<List<Message>> = sqsAsyncClient
    .receiveMessage(receiveMessageRequest)
    .thenApply { result -> result.messages() }
fun listen(): Flux<Pair<String, () -> Unit>> {
    return Flux.create { sink: FluxSink<List<Message>> ->
            while (running) {
                try {
                    val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
                        .queueUrl(queueUrl)
                        .maxNumberOfMessages(5)
                        .waitTimeSeconds(10)
                        .build()
 
fun listen(): Flux<Pair<String, () -> Unit>> {
return Flux.generate { sink: SynchronousSink<List<Message>> ->
val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
.queueUrl(queueUrl)
.maxNumberOfMessages(5)
.waitTimeSeconds(10)
.build()
val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages()
LOGGER.info("Received: $messages")
Flux.generate { sink: SynchronousSink<List<Message>> ->
    val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
        .queueUrl(queueUrl)
        .maxNumberOfMessages(5)
        .waitTimeSeconds(10)
        .build()
 
    val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages()
    sink.next(messages)
}
Flux.generate { sink: SynchronousSink<List<Message>> ->
    val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
        .queueUrl(queueUrl)
        .maxNumberOfMessages(5)
        .waitTimeSeconds(10)
        .build()
 
    val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages()
    sink.next(messages)
}
Flux.generate { sink: SynchronousSink<List<Message>> ->
    val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
        .queueUrl(queueUrl)
        .maxNumberOfMessages(5)
        .waitTimeSeconds(10)
        .build()
 
    val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages()
    sink.next(messages)
}
Flux.generate { sink: SynchronousSink<List<Message>> ->
    val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
        .queueUrl(queueUrl)
        .maxNumberOfMessages(5)
        .waitTimeSeconds(10)
        .build()
 
    val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages()
    sink.next(messages)
}
.flatMap({ (message: String, deleteHandle: () -> Unit) ->
task(message)
.then(Mono.fromSupplier { Try.of { deleteHandle() } })
.doOnNext { t ->
t.onFailure { e -> LOGGER.error(e.message, e) }
}
.then()
.subscribeOn(taskScheduler)
}, concurrency)
http :9080/hotels id=4 name=name address=address zip=zip state=OR
fun produce(targetRate: Int, upto: Long): Flux<Long> {
    val delayBetweenEmits: Long = 1000L / targetRate
 
    return Flux.generate(
        { 1L },
        { state: Long, sink: SynchronousSink<Long> ->
            sleep(delayBetweenEmits)
            val nextState: Long = state + 1
            if (state > upto) {
                sink.complete()