Created
February 1, 2013 07:28
-
-
Save abh1nav/4689905 to your computer and use it in GitHub Desktop.
A proper SQS spout for Storm
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.crowdriff.engine.spouts; | |
import java.util.Map; | |
import java.util.concurrent.ConcurrentLinkedQueue; | |
import backtype.storm.spout.SpoutOutputCollector; | |
import backtype.storm.task.TopologyContext; | |
import backtype.storm.topology.OutputFieldsDeclarer; | |
import backtype.storm.topology.base.BaseRichSpout; | |
import backtype.storm.tuple.Fields; | |
import backtype.storm.tuple.Values; | |
import com.amazonaws.services.sqs.AmazonSQSAsyncClient; | |
import com.amazonaws.services.sqs.model.Message; | |
import com.amazonaws.services.sqs.model.ReceiveMessageRequest; | |
import com.amazonaws.services.sqs.model.ReceiveMessageResult; | |
import com.crowdriff.engine.util.Util; | |
import com.google.common.collect.Queues; | |
@SuppressWarnings("serial") | |
public class SqsSpout extends BaseRichSpout { | |
private final String sqsUrl; | |
private final String streamId; | |
private SpoutOutputCollector collector; | |
private AmazonSQSAsyncClient sqsClient; | |
private ConcurrentLinkedQueue<Message> buffer; | |
public SqsSpout(String sqsUrl) { | |
this.sqsUrl = sqsUrl; | |
this.streamId = "default"; | |
} | |
public SqsSpout(String sqsUrl, String streamId) { | |
this.sqsUrl = sqsUrl; | |
this.streamId = streamId; | |
} | |
@SuppressWarnings("rawtypes") | |
@Override | |
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { | |
this.collector = collector; | |
sqsClient = Util.newSqsClient(); | |
buffer = Queues.newConcurrentLinkedQueue(); | |
} | |
@Override | |
public void nextTuple() { | |
if(buffer.isEmpty()) { | |
ReceiveMessageRequest request = new ReceiveMessageRequest(sqsUrl); | |
request.setMaxNumberOfMessages(10); | |
ReceiveMessageResult receiveMessageResult = sqsClient.receiveMessage(request); | |
buffer.addAll(receiveMessageResult.getMessages()); | |
} | |
Message m = buffer.poll(); | |
if(null != m) | |
collector.emit(streamId, new Values(m.getBody()), m.getReceiptHandle()); | |
} | |
@Override | |
public void declareOutputFields(OutputFieldsDeclarer declarer) { | |
declarer.declareStream(streamId, new Fields("message")); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment