Skip to content

Instantly share code, notes, and snippets.

@nubunto
Created August 5, 2019 04:26
Show Gist options
  • Save nubunto/92a098fc07fff1bd6e9dc5163d78feb2 to your computer and use it in GitHub Desktop.
Save nubunto/92a098fc07fff1bd6e9dc5163d78feb2 to your computer and use it in GitHub Desktop.
Storm Bolts and Spouts example
package com.ifood.connection;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.ConfigurableTopology;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.Map;
import java.util.Random;
public class LegacyTopology extends ConfigurableTopology {
public static void main(String[] args) {
ConfigurableTopology.start(new LegacyTopology(), args);
}
@Override
protected int run(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("rand", new RandomNumberSpout(), 10);
builder.setBolt("square", new SquareBolt(), 3).shuffleGrouping("rand");
builder.setBolt("triple", new TripleBolt(), 2).shuffleGrouping("square");
builder.setBolt("save", new SaveBolt(), 1).shuffleGrouping("rand").shuffleGrouping("triple");
String topologyName = "LegacyTopology";
return submit(topologyName, conf, builder);
}
public static class RandomNumberSpout extends BaseRichSpout {
private Random random;
private SpoutOutputCollector collector;
@Override
public void open(Map map, TopologyContext context, SpoutOutputCollector collector) {
this.random = new Random();
this.collector = collector;
}
@Override
public void nextTuple() {
Utils.sleep(100);
this.collector.emit(new Values(random.nextInt(), System.currentTimeMillis()));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("randomInt", "timestamp"));
}
}
public static class SquareBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
Integer i = input.getIntegerByField("randomInt");
this.collector.emit(input, new Values(i * i));
this.collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("square"));
}
}
public static class TripleBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
Integer i = input.getInteger(0);
this.collector.emit(input, new Values(i * 3));
this.collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("triple"));
}
}
public static class SaveBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
System.out.println("got tuple: " + input.toString());
this.collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment