Skip to content

Instantly share code, notes, and snippets.

@nubunto
Created August 5, 2019 04:45
Show Gist options
  • Save nubunto/d65e67e2d971ea892e20ce9b9ad0adbc to your computer and use it in GitHub Desktop.
Save nubunto/d65e67e2d971ea892e20ce9b9ad0adbc to your computer and use it in GitHub Desktop.
Apache Storm spout implementation for SQS
package com.your.package;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.utils.Utils;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.*;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;
public abstract class SqsQueueSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private SqsClient sqsClient;
private LinkedBlockingQueue<Message> queue;
private final String queueURL;
private final boolean reliable;
private final AwsCredentialsProvider awsCredentialsProvider;
private final Supplier<String> awsRegionSupplier;
private int sleepTimeInSeconds;
public SqsQueueSpout(String queueURL, boolean reliable, AwsCredentialsProvider awsCredentialsProvider, Supplier<String> awsRegionSupplier) {
this.queueURL = queueURL;
this.reliable = reliable;
this.awsCredentialsProvider = awsCredentialsProvider;
this.awsRegionSupplier = awsRegionSupplier;
this.sleepTimeInSeconds = 30;
}
@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
this.queue = new LinkedBlockingQueue<>();
this.sqsClient = SqsClient.builder().
credentialsProvider(awsCredentialsProvider)
.region(Region.of(awsRegionSupplier.get()))
.build();
}
@Override
public void nextTuple() {
this.pollSqsIfNeeded();
Message message = this.queue.poll();
if (message != null) {
String receiptHandle = message.receiptHandle();
if(this.reliable) {
this.collector.emit(getStreamID(message), messageToStormTuple(message), receiptHandle);
} else {
// delete it right away
this.deleteMessage(receiptHandle);
this.collector.emit(getStreamID(message), messageToStormTuple(message));
}
}
}
@Override
public void ack(Object receiptHandle) {
this.deleteMessage((String) receiptHandle);
}
@Override
public void fail(Object receiptHandle) {
this.changeMessageVisibility((String) receiptHandle);
}
public abstract List<Object> messageToStormTuple(Message message);
public String getStreamID(Message message) {
return Utils.DEFAULT_STREAM_ID;
}
private void changeMessageVisibility(String receiptHandle) {
ChangeMessageVisibilityRequest changeMessageVisibilityRequest = ChangeMessageVisibilityRequest.builder()
.queueUrl(this.queueURL)
.receiptHandle(receiptHandle)
.visibilityTimeout(0)
.build();
this.sqsClient.changeMessageVisibility(changeMessageVisibilityRequest);
}
private void deleteMessage(String receiptHandle) {
DeleteMessageRequest deleteMessageRequest = DeleteMessageRequest.builder()
.queueUrl(this.queueURL)
.receiptHandle(receiptHandle)
.build();
this.sqsClient.deleteMessage(deleteMessageRequest);
}
private void pollSqsIfNeeded() {
if(this.queue.isEmpty()) {
ReceiveMessageResponse receiveMessageResponse = this.pollSqsQueue();
this.queue.addAll(receiveMessageResponse.messages());
}
}
private ReceiveMessageResponse pollSqsQueue() {
ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder()
.queueUrl(queueURL)
.waitTimeSeconds(sleepTimeInSeconds)
.build();
return sqsClient.receiveMessage(receiveMessageRequest);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment