Skip to content

Instantly share code, notes, and snippets.

@Xorlev
Last active April 7, 2019 15:31
Show Gist options
  • Star 9 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Xorlev/8058947 to your computer and use it in GitHub Desktop.
Save Xorlev/8058947 to your computer and use it in GitHub Desktop.
Example of emitting on multiple streams
package storm.examples;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;
public class MultipleStreamBolt extends BaseRichBolt {
@Override
public void prepare(Map config, TopologyContext topologyContext, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
// Lets break the tuple into pieces
collector.emit("stream1", tuple, new Values(tuple.getValue(0)));
collector.emit("stream2", tuple, new Values(tuple.getValue(1)));
// ^ stream ^ anchor ^ new tuple
// you can still emit on the default stream (e.g. the one named after the bolt)
collector.emit(tuple, new Values(...));
collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declareStream("stream1", new Fields("field1"));
outputFieldsDeclarer.declareStream("stream2", new Fields("field2"));
outputFieldsDeclarer.declare(new Fields("defaultStreamField")); // if you want
}
}
@Cam1337
Copy link

Cam1337 commented Jun 27, 2015

This looks very nice.

@cossea
Copy link

cossea commented Mar 14, 2016

how know that multiple streams received succesfully.

@asif31iqbal
Copy link

How do you set the topology for multiple streams?

@ysyyork
Copy link

ysyyork commented Apr 7, 2019

https://stackoverflow.com/a/30396802/3265888 this post may help on how to setup topology

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment