Skip to content

Instantly share code, notes, and snippets.

@sskrla
Last active October 8, 2016 07:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sskrla/ae7bf1c15465bdd27852159b6071de47 to your computer and use it in GitHub Desktop.
Save sskrla/ae7bf1c15465bdd27852159b6071de47 to your computer and use it in GitHub Desktop.
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.WaitStrategies;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import lombok.Data;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Queue;
import java.util.Timer;
import java.util.concurrent.Semaphore;
import static com.google.common.collect.Lists.transform;
import static java.lang.Math.max;
import static java.lang.Math.min;
import static java.util.concurrent.TimeUnit.MINUTES;
import static reactor.util.concurrent.QueueSupplier.XS_BUFFER_SIZE;
/**
* This publisher will poll an SQS queue when it is subscribed too. Additionally, it will prevent more than
* {@link ProducerConfig#maxInflightMessages} messages from being acquired at any given time. In flight messages will
* have their visibility refreshed automatically periodically.
*
* It is the responsibility of downstream consumers to {@link SqsMessage#ack()} or {@link SqsMessage#reject()} messages
* or message production will stall indefinitely.
*
* Created by sskrla on 8/10/16.
*/
@Slf4j
public class SqsPublisher implements Publisher<SqsMessage> {
final AmazonSQSClient client;
final ProducerConfig config;
final String queueUrl;
final Retryer retryLoop = RetryerBuilder.newBuilder()
.retryIfException(e -> {
log.warn("SQS communication problem", e);
return true;
})
.withWaitStrategy(WaitStrategies.exponentialWait())
.build();
@Inject
public SqsPublisher(
AmazonSQSClient client,
@Assisted String queueUrl,
@Assisted ProducerConfig config) {
this.queueUrl = queueUrl;
this.client = client;
this.config = config;
}
@Override
public void subscribe(Subscriber<? super SqsMessage> s) {
SqsVisibilityRefreshTask refresher = new SqsVisibilityRefreshTask(client, config, queueUrl);
Timer refreshTimer = new Timer();
refreshTimer.schedule(refresher, (long)(config.visibilityTimeout / 2) );
ReceiveMessageRequest request = new ReceiveMessageRequest(queueUrl)
.withWaitTimeSeconds(config.waitTime)
.withVisibilityTimeout(config.visibilityTimeout);
Semaphore available = new Semaphore(config.maxInflightMessages);
Flux.<SqsMessage, Queue<Message>>generate(
() -> new ArrayDeque<>(),
(fetched, emitter) -> {
while(fetched.isEmpty()) {
int toFetch = 0;
try {
toFetch = max(1, min(10, available.availablePermits()));
available.acquire(toFetch);
request.setMaxNumberOfMessages(toFetch);
List<Message> messages = (List<Message>) retryLoop
.call(() -> client
.receiveMessage(request)
.getMessages());
refresher.register(transform(messages, Message::getReceiptHandle));
fetched.addAll(messages);
} catch(Throwable t) {
available.release(toFetch);
log.error("Failed to talk to SQS, will try again...", t);
if(Thread.interrupted())
log.warn("Thread was interrupted, but we cleared the flag and continued.");
}
}
emitter.next(wrap(fetched.poll(), available, refresher));
return fetched;
})
.doOnCancel(() -> refreshTimer.cancel())
.doOnComplete(() -> refreshTimer.cancel())
.subscribe(s);
}
SqsMessage wrap(Message message, Semaphore available, SqsVisibilityRefreshTask inflight) {
return new SqsMessage(message) {
@Override
public void ack() {
try {
retryLoop.call(() -> client.deleteMessage(queueUrl, rawMessage.getReceiptHandle()));
} catch(Exception e) {
log.error("Unable to delete message from SQS: " + rawMessage.getMessageId() , e);
} finally {
inflight.remove(rawMessage.getReceiptHandle());
available.release();
}
}
@Override
public void reject() {
try {
client.changeMessageVisibility(queueUrl, rawMessage.getReceiptHandle(), 0);
} catch (Exception e) {
log.error("Unable to expire message in SQS: " + rawMessage.getMessageId(), e);
} finally {
inflight.remove(rawMessage.getReceiptHandle());
available.release();
}
}
};
}
@Accessors(chain = true)
public static @Data class ProducerConfig {
int visibilityTimeout = (int) MINUTES.toSeconds(2);
int waitTime = 10;
int maxInflightMessages = XS_BUFFER_SIZE;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment