Skip to content

Instantly share code, notes, and snippets.

@abh1nav
Forked from anonymous/SampleSpout.java
Created December 12, 2012 04:04
Show Gist options
  • Save abh1nav/4264781 to your computer and use it in GitHub Desktop.
Save abh1nav/4264781 to your computer and use it in GitHub Desktop.
public class SampleBolt extends BaseRichBolt {
OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String firstName = input.getStringByField("firstName");
double height = input.getDoubleByField("height");
Object complexObject = (Object) input.getValueByField("complexObject");
if(height > 5.0)
collector.emit(new Values(firstName.toLowerCase()));
collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("lowercaseFirstName"));
}
}
public class SampleSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
/*
* Connect to your data source here
* Eg. Run a query and store a reference to the cursor
*/
}
@Override
public void nextTuple() {
/*
* Iterate through the stored cursor ref and push out
* one "row" into the collector as a Tuple
*/
collector.emit("default", new Values("John", 5.5, new Object()));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
/*
* Schema declaration of the stream
*/
declarer.declareStream("default", new Fields("firstName", "height", "complexObject"));
}
}
public class SampleTopology {
public static void main(String[] args) {
Config config = new Config();
TopologyBuilder b = new TopologyBuilder();
b.setSpout("sampleSpoutId", new SampleSpout(), 1);
b.setBolt("sampleBoltId", new SampleBolt(), 1).shuffleGrouping("sampleSpoutId");
final LocalCluster cluster = new LocalCluster();
cluster.submitTopology("SampleTopologyName", config, b.createTopology());
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
cluster.killTopology("SampleTopologyName");
cluster.shutdown();
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment