Skip to content

Instantly share code, notes, and snippets.

@sekky0905
Last active July 16, 2017 06:29
Show Gist options
  • Save sekky0905/1858c416e5a15fe04e76b747b266bbf7 to your computer and use it in GitHub Desktop.
Save sekky0905/1858c416e5a15fe04e76b747b266bbf7 to your computer and use it in GitHub Desktop.
Apache Beam with Cloud Dataflow(over 2.0.0系)入門~基本部分~ParDoまで~ ref: http://qiita.com/Sekky0905/items/381ed27fca9a16f8ef07
[Output PCollection] = [Input PCollection].apply([Transform])
[Final Output PCollection] = [Initial Input PCollection].apply([First Transform])
.apply([Second Transform])
.apply([Third Transform])
// まずは外部ソースから読み込んで、PCollectionを生成する
PCollection<String> inputData = // ここで外部ソースから読み込みを行う;
// static classとして関数オブジェクトを定義
// DoFnの左側のinputの型を、右側にoutputの型を型パラメータとして定義する
// 必ず、DoFnをextendsする
static class FilterEvenFn extends DoFn<String, Integer> {
// 実際の処理ロジックをアノテーションで宣言する
@ProcessElement
// 実際の処理ロジックは、processElementメソッドに記述する
// 引数のProcessContextを利用してinputやoutputを行う
public void processElement(ProcessContext c) {
// ProcessContextからinput elementを取得
int num = Integer.parseInt(c.element());
// input elementを使用した処理
if (num % 2 == 0) {
// ProcessContextを使用して出力
c.output(String.valueOf(num));
}
}
}
//
PCollection<Integer> evenData = inputData.apply(
ParDo
.of(new FilterEvenFn()));
PCollection<String> newPCollection = pipeline.apply(TextIO.read().from(INPUT_FILE_PATH));
pCollection.apply(TextIO.write().to(OUTPUT_FILE_PATH));
pipeline.run();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.company;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
/**
* メイン
*/
public class Main {
// DoFnを実装したクラス
// DoFnの横の<T,T>でinputとoutputの方の定義を行う
static class FilterEvenFn extends DoFn<String, String> {
// 実際の処理ロジックにはこのアノテーションをつける
@ProcessElement
// 実際の処理ロジックは、processElementメソッドに記述する
// 引数のProcessContextを利用してinputやoutputを行う
public void processElement(ProcessContext c) {
// ProcessContextからinput elementを取得
int num = Integer.parseInt(c.element());
// input elementを使用した処理
if (num % 2 == 0) {
// ProcessContextを使用して出力
c.output(String.valueOf(num));
}
}
}
// インプットデータのパス
private static final String INPUT_FILE_PATH = "./dataflow_number_test.csv";
// アウトデータのパス
private static final String OUTPUT_FILE_PATH = "./sample.csv";
public static void main(String[] args) {
// まずPipelineに設定するOptionを作成する
// 今回は、ローカルで起動するため、DirectRunnerを指定する
// ローカルモードでは、DirectRunnerがすでにデフォルトになっているため、ランナーを設定する必要はない
PipelineOptions options = PipelineOptionsFactory.create();
// Optionを元にPipelineを生成する
Pipeline pipeline = Pipeline.create(options);
// inout dataを読み込んで、そこからPCollection(パイプライン内の一連のデータ)を作成する
PCollection<String> inputData = pipeline.apply(TextIO.read().from(INPUT_FILE_PATH));
// 処理
PCollection<String> evenData = inputData.apply(ParDo.of(new FilterEvenFn()));
// 書き込む
evenData.apply(TextIO.write().to(OUTPUT_FILE_PATH));
// run : PipeLine optionで指定したRunnerで実行
// waitUntilFinish : PipeLineが終了するまで待って、最終的な状態を返す
pipeline.run().waitUntilFinish();
}
}
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
/**
* メイン
* Created by sekiguchikai on 2017/07/05.
*/
public class Main {
// DoFnを実装したクラス
// DoFnの横の<T,T>でinputとoutputの方の定義を行う
static class FilterEvenFn extends DoFn<String, String> {
// 実際の処理ロジックにはこのアノテーションをつける
@ProcessElement
// 実際の処理ロジックは、processElementメソッドに記述する
// 引数のProcessContextを利用してinputやoutputを行う
public void processElement(ProcessContext c) {
System.out.print(c.element());
// ProcessContextからinput elementを取得
int num = Integer.parseInt(c.element());
// input elementを使用した処理
if (num % 2 == 0) {
System.out.println("ifの結果" + num);
// ProcessContextを使用して出力
c.output(String.valueOf(num));
}
}
}
// インプットデータのパス
private static final String INPUT_FILE_PATH = "./dataflow_number_test.csv";
// アウトデータのパス
private static final String OUTPUT_FILE_PATH = "./sample.csv";
public static void main(String[] args) {
// Optionを元にPipelineを生成する
Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create());
// メソッドチェーンを使用
pipeline.apply(TextIO.read().from(INPUT_FILE_PATH))
.apply(ParDo.of(new FilterEvenFn()))
// 書き込む
.apply(TextIO.write().to(OUTPUT_FILE_PATH));
// run : PipeLine optionで指定したRunnerで実行
// waitUntilFinish : PipeLineが終了するまで待って、最終的な状態を返す
pipeline.run().waitUntilFinish();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment