Last active
October 24, 2016 07:54
-
-
Save stdatalabs/209261b37613509482b6f5bf18f6b21d to your computer and use it in GitHub Desktop.
A word counter bolt to print list of top words over a time interval. More @ stdatalabs.blogspot.com
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import backtype.storm.task.OutputCollector; | |
import backtype.storm.task.TopologyContext; | |
import backtype.storm.topology.OutputFieldsDeclarer; | |
import backtype.storm.topology.base.BaseRichBolt; | |
import backtype.storm.tuple.Tuple; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.util.HashMap; | |
import java.util.Map; | |
import java.util.SortedMap; | |
import java.util.TreeMap; | |
/** | |
* Keeps stats on word count, calculates and prints top list every | |
* logIntervalSec seconds to stdout | |
* | |
* More discussion at stdatalabs.blogspot.com | |
* | |
* @author Sachin Thirumala | |
*/ | |
public class WordCounterBolt extends BaseRichBolt { | |
private static final long serialVersionUID = 2706047697068872387L; | |
private static final Logger logger = LoggerFactory.getLogger(WordCounterBolt.class); | |
/** Number of seconds before the top list will be logged to stdout. */ | |
private final long logIntervalSec; | |
/** Number of seconds before the top list will be cleared. */ | |
private final long clearIntervalSec; | |
/** Number of top words to store in stats. */ | |
private final int topListSize; | |
private Map<String, Long> counter; | |
private long lastLogTime; | |
private long lastClearTime; | |
private OutputCollector collector; | |
public WordCounterBolt(long logIntervalSec, long clearIntervalSec, int topListSize) { | |
this.logIntervalSec = logIntervalSec; | |
this.clearIntervalSec = clearIntervalSec; | |
this.topListSize = topListSize; | |
} | |
@Override | |
public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) { | |
counter = new HashMap<String, Long>(); | |
lastLogTime = System.currentTimeMillis(); | |
lastClearTime = System.currentTimeMillis(); | |
this.collector = collector; | |
} | |
@Override | |
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { | |
} | |
@Override | |
public void execute(Tuple input) { | |
if (input.getFields().contains("word")) { | |
String word = (String) input.getValueByField("word"); | |
Long count = counter.get(word); | |
count = count == null ? 1L : count + 1; | |
counter.put(word, count); | |
long now = System.currentTimeMillis(); | |
long logPeriodSec = (now - lastLogTime) / 1000; | |
if (logPeriodSec > logIntervalSec) { | |
logger.info("\n\n"); | |
logger.info("Word count: " + counter.size()); | |
publishTopList(); | |
lastLogTime = now; | |
} | |
collector.ack(input); | |
} | |
} | |
private void publishTopList() { | |
// calculate top list: | |
SortedMap<Long, String> top = new TreeMap<Long, String>(); | |
for (Map.Entry<String, Long> entry : counter.entrySet()) { | |
long count = entry.getValue(); | |
String word = entry.getKey(); | |
top.put(count, word); | |
if (top.size() > topListSize) { | |
top.remove(top.firstKey()); | |
} | |
} | |
// Output top list: | |
for (Map.Entry<Long, String> entry : top.entrySet()) { | |
logger.info(new StringBuilder("Top List - ").append(entry.getValue()).append('|').append(entry.getKey()) | |
.toString()); | |
} | |
// Clear top list | |
long now = System.currentTimeMillis(); | |
if (now - lastClearTime > clearIntervalSec * 1000) { | |
counter.clear(); | |
lastClearTime = now; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment