Skip to content

Instantly share code, notes, and snippets.

@abh1nav
Created February 1, 2013 07:28
Show Gist options
  • Save abh1nav/4689905 to your computer and use it in GitHub Desktop.
Save abh1nav/4689905 to your computer and use it in GitHub Desktop.
A proper SQS spout for Storm
package com.crowdriff.engine.spouts;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import com.amazonaws.services.sqs.AmazonSQSAsyncClient;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.crowdriff.engine.util.Util;
import com.google.common.collect.Queues;
@SuppressWarnings("serial")
public class SqsSpout extends BaseRichSpout {
private final String sqsUrl;
private final String streamId;
private SpoutOutputCollector collector;
private AmazonSQSAsyncClient sqsClient;
private ConcurrentLinkedQueue<Message> buffer;
public SqsSpout(String sqsUrl) {
this.sqsUrl = sqsUrl;
this.streamId = "default";
}
public SqsSpout(String sqsUrl, String streamId) {
this.sqsUrl = sqsUrl;
this.streamId = streamId;
}
@SuppressWarnings("rawtypes")
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
sqsClient = Util.newSqsClient();
buffer = Queues.newConcurrentLinkedQueue();
}
@Override
public void nextTuple() {
if(buffer.isEmpty()) {
ReceiveMessageRequest request = new ReceiveMessageRequest(sqsUrl);
request.setMaxNumberOfMessages(10);
ReceiveMessageResult receiveMessageResult = sqsClient.receiveMessage(request);
buffer.addAll(receiveMessageResult.getMessages());
}
Message m = buffer.poll();
if(null != m)
collector.emit(streamId, new Values(m.getBody()), m.getReceiptHandle());
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream(streamId, new Fields("message"));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment