Last active
August 29, 2015 14:13
-
-
Save shun91/bcc009d14d823bb3c921 to your computer and use it in GitHub Desktop.
Stormで,テキストファイルから読み込むSpoutのサンプル.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.BufferedReader; | |
import java.io.FileReader; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.concurrent.LinkedBlockingQueue; | |
import backtype.storm.spout.SpoutOutputCollector; | |
import backtype.storm.task.TopologyContext; | |
import backtype.storm.topology.OutputFieldsDeclarer; | |
import backtype.storm.topology.base.BaseRichSpout; | |
import backtype.storm.tuple.Fields; | |
import backtype.storm.tuple.Values; | |
/** | |
* テキストファイルから読み込むSpoutのサンプル. | |
* | |
* 【注意】動作テストしてません.多分動くと思いますが... | |
* 利用する時は27行目の INPUT_FILEPATH を指定するようにしてください. | |
* | |
* @author shun91 | |
* | |
*/ | |
public class FileReadSampleSpout extends BaseRichSpout { | |
// 読み込むテキストファイルの絶対パス | |
private static final String INPUT_FILEPATH = ""; | |
private static final long serialVersionUID = 1L; // 警告回避用 | |
SpoutOutputCollector _collector; | |
LinkedBlockingQueue<List<Object>> queue = null; // 読み込んだものを一時的に格納するキュー | |
private Thread readThread;// テキストファイルを読み込むスレッド | |
/** | |
* Spout起動時に1度だけ実行される初期化メソッド | |
*/ | |
@SuppressWarnings("rawtypes") | |
@Override | |
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { | |
_collector = collector; | |
queue = new LinkedBlockingQueue<List<Object>>(); // キューのインスタンス生成 | |
// スレッドの起動 | |
startThread(); | |
} | |
/** | |
* タプルを送出するメソッド.繰り返し実行し続けている. | |
* キューにオブジェクトが格納されるとそれを送出する. | |
*/ | |
@Override | |
public void nextTuple() { | |
try { | |
if(queue.size() != 0){ // 環境によってはチェックしないとエラーになる? | |
_collector.emit(queue.take()); | |
} | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
/** | |
* 送出タプルのフィールドを定義するメソッド. | |
*/ | |
@Override | |
public void declareOutputFields(OutputFieldsDeclarer declarer) { | |
declarer.declare(new Fields("text")); | |
} | |
/** | |
* Threadを起動するメソッド | |
*/ | |
private void startThread() { | |
readThread = new Thread(new Reader()); // スレッド生成 | |
try { | |
readThread.start(); | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} | |
} | |
/** | |
* スレッドで実行されるクラス本体.run()メソッドが実行される. | |
* テキストファイルを1行ずつ読み込み,タプルに変換した後キューに格納する. | |
*/ | |
class Reader implements Runnable { | |
@Override | |
public final void run() { | |
BufferedReader br; | |
String line; | |
try { | |
br = new BufferedReader(new FileReader(INPUT_FILEPATH)); | |
while ((line = br.readLine()) != null) { // ファイルを1行ずつ読み込む | |
queue.offer(new Values(line)); // タプルに変換してキューに格納 | |
} | |
br.close(); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} finally { | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment