Skip to content

Instantly share code, notes, and snippets.

@shun91
Last active August 29, 2015 14:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shun91/bcc009d14d823bb3c921 to your computer and use it in GitHub Desktop.
Save shun91/bcc009d14d823bb3c921 to your computer and use it in GitHub Desktop.
Stormで,テキストファイルから読み込むSpoutのサンプル.
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
/**
* テキストファイルから読み込むSpoutのサンプル.
*
* 【注意】動作テストしてません.多分動くと思いますが...
* 利用する時は27行目の INPUT_FILEPATH を指定するようにしてください.
*
* @author shun91
*
*/
public class FileReadSampleSpout extends BaseRichSpout {
// 読み込むテキストファイルの絶対パス
private static final String INPUT_FILEPATH = "";
private static final long serialVersionUID = 1L; // 警告回避用
SpoutOutputCollector _collector;
LinkedBlockingQueue<List<Object>> queue = null; // 読み込んだものを一時的に格納するキュー
private Thread readThread;// テキストファイルを読み込むスレッド
/**
* Spout起動時に1度だけ実行される初期化メソッド
*/
@SuppressWarnings("rawtypes")
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
queue = new LinkedBlockingQueue<List<Object>>(); // キューのインスタンス生成
// スレッドの起動
startThread();
}
/**
* タプルを送出するメソッド.繰り返し実行し続けている.
* キューにオブジェクトが格納されるとそれを送出する.
*/
@Override
public void nextTuple() {
try {
if(queue.size() != 0){ // 環境によってはチェックしないとエラーになる?
_collector.emit(queue.take());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 送出タプルのフィールドを定義するメソッド.
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("text"));
}
/**
* Threadを起動するメソッド
*/
private void startThread() {
readThread = new Thread(new Reader()); // スレッド生成
try {
readThread.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* スレッドで実行されるクラス本体.run()メソッドが実行される.
* テキストファイルを1行ずつ読み込み,タプルに変換した後キューに格納する.
*/
class Reader implements Runnable {
@Override
public final void run() {
BufferedReader br;
String line;
try {
br = new BufferedReader(new FileReader(INPUT_FILEPATH));
while ((line = br.readLine()) != null) { // ファイルを1行ずつ読み込む
queue.offer(new Values(line)); // タプルに変換してキューに格納
}
br.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment