Skip to content

Instantly share code, notes, and snippets.

@osi
Created May 2, 2019 19:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save osi/5d54c930db5eb51359bcd67147677578 to your computer and use it in GitHub Desktop.
Save osi/5d54c930db5eb51359bcd67147677578 to your computer and use it in GitHub Desktop.
SQS receiver using Project Reactor
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.AmazonSQSException;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Schedulers;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ReactiveSqsListener {
private static final Logger logger = LoggerFactory.getLogger(ReactiveSqsListener.class);
public static Flux<Message> listen(AmazonSQS sqs, String responseQueueUrl) {
return Flux.<Message>create(sink -> {
Lock lock = new ReentrantLock();
Condition demand = lock.newCondition();
Runnable signal = () -> {
if (lock.tryLock()) {
demand.signal();
lock.unlock();
}
};
sink.onRequest(l -> signal.run())
.onDispose(signal::run);
ReceiveMessageRequest request = new ReceiveMessageRequest(responseQueueUrl)
.withWaitTimeSeconds(20);
lock.lock();
try {
while (!sink.isCancelled()) {
if (sink.requestedFromDownstream() == 0) {
try {
demand.await();
} catch (InterruptedException ignored) {
// we woke up! let's keep doing the things.
}
}
int maxNumberOfMessages = (int) Long.min(10, sink.requestedFromDownstream());
if (maxNumberOfMessages < 1) {
continue;
}
//metrics - responseDemand.set(maxNumberOfMessages);
request.withMaxNumberOfMessages(maxNumberOfMessages);
List<Message> messages;
try {
messages = sqs.receiveMessage(request).getMessages();
} catch (AmazonSQSException e) {
logger.warn("failed to execute request {}", request);
sink.error(e);
return;
}
//metrics - received.increment(messages.size());
messages.forEach(sink::next);
}
} finally {
lock.unlock();
}
}, FluxSink.OverflowStrategy.BUFFER)
.subscribeOn(Schedulers.elastic(), false)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment