Last active
October 24, 2016 07:50
-
-
Save stdatalabs/adc532894f3c2f84c435bc2f9c069838 to your computer and use it in GitHub Desktop.
A Storm topology to consume messages from kafka topic and count list of top words used in tweets. More @ stdatalabs.blogspot.com
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import backtype.storm.tuple.Fields; | |
import backtype.storm.tuple.Values; | |
import storm.kafka.KafkaSpout; | |
import storm.kafka.SpoutConfig; | |
import storm.kafka.ZkHosts; | |
import java.util.Arrays; | |
import backtype.storm.Config; | |
import backtype.storm.LocalCluster; | |
import backtype.storm.StormSubmitter; | |
import backtype.storm.spout.SchemeAsMultiScheme; | |
import backtype.storm.topology.TopologyBuilder; | |
import storm.kafka.StringScheme; | |
/** | |
* A Storm topology to consume messages from kafka topic and count | |
* and display the list of top words used in tweets on a keyword | |
* | |
* More discussion at stdatalabs.blogspot.com | |
* | |
* @author Sachin Thirumala | |
*/ | |
public class KafkaTwitterTopology { | |
public static void main(String[] args) { | |
String zkIp = "localhost"; | |
String nimbusHost = "localhost"; | |
String zookeeperHost = zkIp + ":2181"; | |
ZkHosts zkHosts = new ZkHosts(zookeeperHost); | |
SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "tweets", "", "storm"); | |
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme(){ | |
@Override | |
public Fields getOutputFields() { | |
return new Fields("tweet"); | |
} | |
}); | |
KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig); | |
TopologyBuilder builder = new TopologyBuilder(); | |
builder.setSpout("twitter-spout", kafkaSpout, 8); | |
builder.setBolt("WordSplitterBolt", new JsonWordSplitterBolt(5)).shuffleGrouping("twitter-spout"); | |
builder.setBolt("IgnoreWordsBolt", new IgnoreWordsBolt()).shuffleGrouping("WordSplitterBolt"); | |
builder.setBolt("WordCounterBolt", new WordCounterBolt(5, 5 * 60, 50)).shuffleGrouping("IgnoreWordsBolt"); | |
Config config = new Config(); | |
config.setDebug(false); | |
config.setMaxTaskParallelism(5); | |
//config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 2); | |
config.put(Config.NIMBUS_HOST, nimbusHost); | |
config.put(Config.NIMBUS_THRIFT_PORT, 6627); | |
config.put(Config.STORM_ZOOKEEPER_PORT, 2181); | |
config.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(zkIp)); | |
try { | |
LocalCluster cluster = new LocalCluster(); | |
cluster.submitTopology("TwitterWordCountStorm", config, builder.createTopology()); | |
} catch (Exception e) { | |
throw new IllegalStateException("Couldn't initialize the topology", e); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment