Last active
October 8, 2016 07:19
-
-
Save sskrla/ae7bf1c15465bdd27852159b6071de47 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.amazonaws.services.sqs.AmazonSQSClient; | |
import com.amazonaws.services.sqs.model.Message; | |
import com.amazonaws.services.sqs.model.ReceiveMessageRequest; | |
import com.github.rholder.retry.Retryer; | |
import com.github.rholder.retry.RetryerBuilder; | |
import com.github.rholder.retry.WaitStrategies; | |
import com.google.inject.Inject; | |
import com.google.inject.assistedinject.Assisted; | |
import lombok.Data; | |
import lombok.experimental.Accessors; | |
import lombok.extern.slf4j.Slf4j; | |
import org.reactivestreams.Publisher; | |
import org.reactivestreams.Subscriber; | |
import reactor.core.publisher.Flux; | |
import java.util.ArrayDeque; | |
import java.util.List; | |
import java.util.Queue; | |
import java.util.Timer; | |
import java.util.concurrent.Semaphore; | |
import static com.google.common.collect.Lists.transform; | |
import static java.lang.Math.max; | |
import static java.lang.Math.min; | |
import static java.util.concurrent.TimeUnit.MINUTES; | |
import static reactor.util.concurrent.QueueSupplier.XS_BUFFER_SIZE; | |
/** | |
* This publisher will poll an SQS queue when it is subscribed too. Additionally, it will prevent more than | |
* {@link ProducerConfig#maxInflightMessages} messages from being acquired at any given time. In flight messages will | |
* have their visibility refreshed automatically periodically. | |
* | |
* It is the responsibility of downstream consumers to {@link SqsMessage#ack()} or {@link SqsMessage#reject()} messages | |
* or message production will stall indefinitely. | |
* | |
* Created by sskrla on 8/10/16. | |
*/ | |
@Slf4j | |
public class SqsPublisher implements Publisher<SqsMessage> { | |
final AmazonSQSClient client; | |
final ProducerConfig config; | |
final String queueUrl; | |
final Retryer retryLoop = RetryerBuilder.newBuilder() | |
.retryIfException(e -> { | |
log.warn("SQS communication problem", e); | |
return true; | |
}) | |
.withWaitStrategy(WaitStrategies.exponentialWait()) | |
.build(); | |
@Inject | |
public SqsPublisher( | |
AmazonSQSClient client, | |
@Assisted String queueUrl, | |
@Assisted ProducerConfig config) { | |
this.queueUrl = queueUrl; | |
this.client = client; | |
this.config = config; | |
} | |
@Override | |
public void subscribe(Subscriber<? super SqsMessage> s) { | |
SqsVisibilityRefreshTask refresher = new SqsVisibilityRefreshTask(client, config, queueUrl); | |
Timer refreshTimer = new Timer(); | |
refreshTimer.schedule(refresher, (long)(config.visibilityTimeout / 2) ); | |
ReceiveMessageRequest request = new ReceiveMessageRequest(queueUrl) | |
.withWaitTimeSeconds(config.waitTime) | |
.withVisibilityTimeout(config.visibilityTimeout); | |
Semaphore available = new Semaphore(config.maxInflightMessages); | |
Flux.<SqsMessage, Queue<Message>>generate( | |
() -> new ArrayDeque<>(), | |
(fetched, emitter) -> { | |
while(fetched.isEmpty()) { | |
int toFetch = 0; | |
try { | |
toFetch = max(1, min(10, available.availablePermits())); | |
available.acquire(toFetch); | |
request.setMaxNumberOfMessages(toFetch); | |
List<Message> messages = (List<Message>) retryLoop | |
.call(() -> client | |
.receiveMessage(request) | |
.getMessages()); | |
refresher.register(transform(messages, Message::getReceiptHandle)); | |
fetched.addAll(messages); | |
} catch(Throwable t) { | |
available.release(toFetch); | |
log.error("Failed to talk to SQS, will try again...", t); | |
if(Thread.interrupted()) | |
log.warn("Thread was interrupted, but we cleared the flag and continued."); | |
} | |
} | |
emitter.next(wrap(fetched.poll(), available, refresher)); | |
return fetched; | |
}) | |
.doOnCancel(() -> refreshTimer.cancel()) | |
.doOnComplete(() -> refreshTimer.cancel()) | |
.subscribe(s); | |
} | |
SqsMessage wrap(Message message, Semaphore available, SqsVisibilityRefreshTask inflight) { | |
return new SqsMessage(message) { | |
@Override | |
public void ack() { | |
try { | |
retryLoop.call(() -> client.deleteMessage(queueUrl, rawMessage.getReceiptHandle())); | |
} catch(Exception e) { | |
log.error("Unable to delete message from SQS: " + rawMessage.getMessageId() , e); | |
} finally { | |
inflight.remove(rawMessage.getReceiptHandle()); | |
available.release(); | |
} | |
} | |
@Override | |
public void reject() { | |
try { | |
client.changeMessageVisibility(queueUrl, rawMessage.getReceiptHandle(), 0); | |
} catch (Exception e) { | |
log.error("Unable to expire message in SQS: " + rawMessage.getMessageId(), e); | |
} finally { | |
inflight.remove(rawMessage.getReceiptHandle()); | |
available.release(); | |
} | |
} | |
}; | |
} | |
@Accessors(chain = true) | |
public static @Data class ProducerConfig { | |
int visibilityTimeout = (int) MINUTES.toSeconds(2); | |
int waitTime = 10; | |
int maxInflightMessages = XS_BUFFER_SIZE; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment