Skip to content

Instantly share code, notes, and snippets.

@cjmatta
Created November 17, 2015 18:33
Show Gist options
  • Save cjmatta/3a560a1738c485258fc4 to your computer and use it in GitHub Desktop.
Save cjmatta/3a560a1738c485258fc4 to your computer and use it in GitHub Desktop.
TupleGenerator
package streamflow.spout.core;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BoltDeclarer;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import org.slf4j.Logger;
import streamflow.annotations.ComponentProperty;
import streamflow.annotations.Description;
import java.util.Map;
/**
* Created by cmatta on 11/17/15.
*/
public class TupleGenerator extends BaseRichSpout {
private SpoutOutputCollector collector;
private Logger logger;
private Integer delay;
private String json;
private Boolean cool;
@ComponentProperty(label = "Delay", name = "jetstream_spout_core_tupleGenerator_delay", required = true, type = "text", defaultValue = "60")
@Description("Delay between emit.")
@Inject
public void setDelay(@Named("jetstream_spout_core_tupleGenerator_delay") String delay) { this.delay = Integer.parseInt(delay); }
@ComponentProperty(label = "JSON", name = "jetstream_spout_core_tupleGenerator_json", required = true, type = "text", defaultValue = "{ \\\"url\\\" : \\\"http://www.google.com\\\" }")
@Description("JSON")
@Inject
public void setJSON(@Named("jetstream_spout_core_tupleGenerator_json") String json) { this.json = json; }
@Override
public void open(Map config, TopologyContext context, SpoutOutputCollector collector) {
logger.info("Spout Tuple Generator Initialized.");
this.collector = collector;
}
@Override
public void close() {
logger.info("Spout Tuple Generator Stopped.");
}
@Override
public void nextTuple() {
Utils.sleep(this.delay * 1000);
collector.emit(new Values(this.json));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("json"));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment