Skip to content

Instantly share code, notes, and snippets.

@stdatalabs
Last active October 24, 2016 08:02
Show Gist options
  • Save stdatalabs/d1abb0bf277e293fc5f4ad40ff4dfef4 to your computer and use it in GitHub Desktop.
Save stdatalabs/d1abb0bf277e293fc5f4ad40ff4dfef4 to your computer and use it in GitHub Desktop.
A storm bolt to split tuples sent by KafkaSpout. More @ stdatalabs.blogspot.com
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import twitter4j.Status;
import java.util.Map;
/**
* Receives tweets and emits its words over a certain length.
*
* More discussion at stdatalabs.blogspot.com
*
* @author Sachin Thirumala
*/
public class JsonWordSplitterBolt extends BaseRichBolt {
private static final long serialVersionUID = 5151173513759399636L;
private final int minWordLength;
private OutputCollector collector;
public JsonWordSplitterBolt(int minWordLength) {
this.minWordLength = minWordLength;
}
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
/* Status tweet = (Status) input.getValueByField("tweet");
String lang = tweet.getUser().getLang();
String text = tweet.getText().replaceAll("\\p{Punct}", " ").replaceAll("\\r|\\n", "").toLowerCase();*/
String tweet = (String) input.getValueByField("tweet");
String lang = "en";
String text = tweet.replaceAll("\\p{Punct}", " ").replaceAll("\\r|\\n", "").toLowerCase();
String[] words = text.split(" ");
for (String word : words) {
if (word.length() >= minWordLength) {
//collector.emit(new Values(lang, word));
collector.emit(input, new Values(lang, word));
}
}
collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("lang", "word"));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment