Skip to content

Instantly share code, notes, and snippets.

@affo
Last active August 2, 2016 07:34
Show Gist options
  • Save affo/2a7317d9c3928f32a578e3a052b3dd2b to your computer and use it in GitHub Desktop.
Save affo/2a7317d9c3928f32a578e3a052b3dd2b to your computer and use it in GitHub Desktop.
Apache Storm - implementation for a spout that connects through a TCP socket to a server
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
import java.util.Map;
/**
* Implementation for a Spout that connects to a server
* using a TCP Socket.
*/
public class SocketSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private String host;
private int port;
private BufferedReader in;
public SocketSpout(String host, int port) {
this.host = host;
this.port = port;
}
public void declareOutputFields(OutputFieldsDeclarer decl) {
// declare your output fields
// decl.declare(new Fields(...));
}
public void open(Map map, TopologyContext ctx, SpoutOutputCollector collector) {
this.collector = collector;
try {
Socket s = new Socket(host, port);
in = new BufferedReader(
new InputStreamReader(s.getInputStream()));
} catch (IOException e) {
e.printStackTrace();
}
}
public void nextTuple() {
try {
String raw = in.readLine();
if (raw == null) {
// make the topology die
throw new SocketSpoutKO();
}
// parse the raw string; e.g.
// raw = raw.replace(...);
// String[] fields = raw.split(",");
// emit using collector
// collector.emit(new Values(...));
} catch (IOException e) {
e.printStackTrace();
}
}
private class SocketSpoutKO extends RuntimeException { }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment