Skip to content

Instantly share code, notes, and snippets.

@MafaldaLandeiro
Created September 19, 2020 16:15
Show Gist options
  • Save MafaldaLandeiro/6b582a78bfec12f7994e6a03838ecfa6 to your computer and use it in GitHub Desktop.
Save MafaldaLandeiro/6b582a78bfec12f7994e6a03838ecfa6 to your computer and use it in GitHub Desktop.
Filtering Message Bolt
package org.ApacheStormTopologyJava.bolt;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FilteringMessageBolt extends BaseBasicBolt {
private static final Logger log = LoggerFactory
.getLogger(FilteringMessageBolt.class);
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
String randomMessage = tuple.getStringByField("randomMessage");
if (!randomMessage.contains("z")) {
basicOutputCollector.emit(tuple.getValues());
} else {
log.info("Message filter out: " + randomMessage);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("randomMessageFiltered", "timestamp"));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment