Skip to content

Instantly share code, notes, and snippets.

@sekky0905
Last active October 10, 2017 16:08
Show Gist options
  • Save sekky0905/dde50898118aa817c3273472ae1f9bbf to your computer and use it in GitHub Desktop.
Save sekky0905/dde50898118aa817c3273472ae1f9bbf to your computer and use it in GitHub Desktop.
Apache Beam with Google Cloud Dataflow(over 2.0.x系)入門~Combine編~ ref: http://qiita.com/Sekky0905/items/4596660455a7a2af5906
PCollection<Integer> sum = pCollection.apply(Sum.integersGlobally());
public static Combine.Globally<Integer, Integer> integersGlobally() {
return Combine.globally(Sum.ofIntegers());}
package com.company;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
/**
* メイン
*/
public class Main {
/**
* 関数オブジェクト
* 与えられたString str, String numを","で分割し、
* numをInteger型に変更して、KV<String, Integer>型にする
*/
static class SplitWordsAndMakeKVFn extends DoFn<String, KV<String, Integer>> {
@ProcessElement
// ProcessContextは、inputを表すobject
// 自分で定義しなくてもBeam SDKが勝手に取ってきてくれる
public void processElement(ProcessContext c) {
// ","で分割
String[] words = c.element().split(",");
// 分割したword[0]をKに、words[1]をIntegerに変換してVにする
c.output(KV.of(words[0], Integer.parseInt(words[1])));
}
}
/**
* 関数オブジェクト
* KV<String, Iterable<Integer>型をString型に変更する
*/
static class TransTypeFromKVAndMakeStringFn extends DoFn<KV<String, Integer>, String> {
@ProcessElement
public void processElement(ProcessContext c) {
// inputをString型に変換する
c.output(String.valueOf(c.element()));
}
}
/**
* インプットデータのパス
*/
private static final String INPUT_FILE_PATH = "./sample.txt";
/**
* アウトデータのパス
*/
private static final String COMBINE_OUTPUT_FILE_PATH = "./src/main/resources/combine_result/result.csv";
/**
* メイン
* 理解のため、メソッドチェーンを極力使用していない
* そのため、冗長なコードになっている
*
* @param args 引数
*/
public static void main(String[] args) {
// まずPipelineに設定するOptionを作成する
// 今回は、ローカルで起動するため、DirectRunnerを指定する
// ローカルモードでは、DirectRunnerがすでにデフォルトになっているため、ランナーを設定する必要はない
PipelineOptions options = PipelineOptionsFactory.create();
// Optionを元にPipelineを生成する
Pipeline pipeline = Pipeline.create(options);
// inout dataを読み込んで、そこからPCollection(パイプライン内の一連のデータ)を作成する
PCollection<String> lines = pipeline.apply(TextIO.read().from(INPUT_FILE_PATH));
// 与えられたString str, String numを","で分割し、numをInteger型に変更して、KV<String, Integer>型にする
PCollection<KV<String, Integer>> kvCounter = lines.apply(ParDo.of(new SplitWordsAndMakeKVFn()));
// Combine PerKey は、オペレーションの一部として GroupByKey 変換を実行する
PCollection<KV<String, Integer>> sumPerKey = kvCounter
.apply(Sum.integersPerKey());
// PCollectionをファイル出力可能な形に変換する
PCollection<String> output = sumPerKey.apply(ParDo.of(new TransTypeFromKVAndMakeStringFn()));
// 書き込む
output.apply(TextIO.write().to(COMBINE_OUTPUT_FILE_PATH));
// run : PipeLine optionで指定したRunnerで実行
// waitUntilFinish : PipeLineが終了するまで待って、最終的な状態を返す
pipeline.run().waitUntilFinish();
}
}
package com.company;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
/**
* メイン
*/
public class Main {
/**
* 関数オブジェクト
* 与えられたString str, String numを","で分割し、
* numをInteger型に変更して、KV<String, Integer>型にする
*/
static class SplitWordsAndMakeKVFn extends DoFn<String, KV<String, Integer>> {
@ProcessElement
// ProcessContextは、inputを表すobject
// 自分で定義しなくてもBeam SDKが勝手に取ってきてくれる
public void processElement(ProcessContext c) {
// ","で分割
String[] words = c.element().split(",");
// 分割したword[0]をKに、words[1]をIntegerに変換してVにする
c.output(KV.of(words[0], Integer.parseInt(words[1])));
}
}
/**
* 関数オブジェクト
* KV<String, Iterable<Integer>型をString型に変更する
*/
static class TransTypeFromKVAndMakeStringFn extends DoFn<KV<String, Integer>, String> {
@ProcessElement
public void processElement(ProcessContext c) {
// inputをString型に変換する
c.output(String.valueOf(c.element()));
}
}
/**
* インプットデータのパス
*/
private static final String INPUT_FILE_PATH = "./sample.txt";
/**
* アウトデータのパス
*/
private static final String COMBINE_OUTPUT_FILE_PATH = "./src/main/resources/combine_result/result.csv";
/**
* メイン
* @param args 引数
*/
public static void main(String[] args) {
Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create());
pipeline
.apply(TextIO.read().from(INPUT_FILE_PATH))
.apply(ParDo.of(new SplitWordsAndMakeKVFn()))
.apply(Sum.integersPerKey())
.apply(ParDo.of(new TransTypeFromKVAndMakeStringFn()))
.apply(TextIO.write().to(COMBINE_OUTPUT_FILE_PATH));
pipeline.run().waitUntilFinish();
}
}
KV{Python, 13}
KV{Python, 13}
PCollection<Integer> sum = integerPCollection.apply(Sum.integersGlobally().withoutDefaults());
PCollection<Integer> sum = integerPCollection.apply(Sum.integersGlobally().withoutDefaults());
package com.company;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.PCollection;
/**
* メインクラス
*/
public class Main {
/**
* 関数型オブジェクト
* String => Integerの型変換を行う
*/
static class TransformTypeFromStringToInteger extends DoFn<String, Integer> {
@ProcessElement
public void processElement(ProcessContext c) {
// 要素をString=>Integerに変換して、output
c.output(Integer.parseInt(c.element()));
}
}
/**
* 関数型オブジェクト
* Integer =>Stringの型変換を行う
*/
static class TransformTypeFromIntegerToString extends DoFn<Integer, String> {
@ProcessElement
public void processElement(ProcessContext c) {
// 要素をString=>Integerに変換して、output
System.out.println(c.element());
c.output(String.valueOf(c.element()));
}
}
/**
* インプットデータのパス
*/
private static final String INPUT_FILE_PATH = "./sample.txt";
/**
* アウトデータのパス
*/
private static final String OUTPUT_FILE_PATH = "./result.txt";
/**
* 理解のためにメソッドチェーンは極力使用しない
* そのため冗長な箇所がある
* メインメソッド
*
* @param args
*/
public static void main(String[] args) {
// optionを指定して、Pipelineを生成する
Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create());
System.out.println("a");
// ファイルから読み込み
PCollection<String> lines = pipeline.apply(TextIO.read().from(INPUT_FILE_PATH));
// 読み込んだ各データをString => Integerに変換
PCollection<Integer> integerPCollection = lines.apply(ParDo.of(new TransformTypeFromStringToInteger()));
// Combine.GloballyでPCollectionの各要素を合計
// 空のPCollectionの場合、emptyを返したいなら => PCollection<Integer> sum = integerPCollection.apply(Sum.integersGlobally().withoutDefaults());
PCollection<Integer> sum = integerPCollection.apply(Sum.integersGlobally().withoutDefaults());
// PCollection<Integer> sumをInteger => Stringに変換
PCollection<String> sumString = sum.apply(ParDo.of(new TransformTypeFromIntegerToString()));
// ファイルに書き込み
sumString.apply(TextIO.write().to(OUTPUT_FILE_PATH));
// 実行
pipeline.run().waitUntilFinish();
}
}
package com.company;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
/**
* メインクラス
*/
public class Main {
/**
* 関数型オブジェクト
* String => Integerの型変換を行う
*/
static class TransformTypeFromStringToInteger extends DoFn<String, Integer> {
@ProcessElement
public void processElement(ProcessContext c) {
// 要素をString=>Integerに変換して、output
c.output(Integer.parseInt(c.element()));
}
}
/**
* 関数型オブジェクト
* Integer =>Stringの型変換を行う
*/
static class TransformTypeFromIntegerToString extends DoFn<Integer, String> {
@ProcessElement
public void processElement(ProcessContext c) {
// 要素をString=>Integerに変換して、output
System.out.println(c.element());
c.output(String.valueOf(c.element()));
}
}
/**
* インプットデータのパス
*/
private static final String INPUT_FILE_PATH = "./sample.txt";
/**
* アウトデータのパス
*/
private static final String OUTPUT_FILE_PATH = "./result.txt";
/**
* メインメソッド
*
* @param args
*/
public static void main(String[] args) {
// Pipeline生成
Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create());
// 処理部分
pipeline.apply(TextIO.read().from(INPUT_FILE_PATH))
.apply(ParDo.of(new TransformTypeFromStringToInteger()))
.apply(Sum.integersGlobally().withoutDefaults())
.apply(ParDo.of(new TransformTypeFromIntegerToString()))
.apply(TextIO.write().to(OUTPUT_FILE_PATH));
// 実行
pipeline.run().waitUntilFinish();
}
}
10
Σk
k = 1
Java [1, 2, 3]
package com.company;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
/**
* メイン
*/
public class Main {
/**
* 関数オブジェクト
* 与えられたString str, String numを","で分割し、
* numをInteger型に変更して、KV<String, Integer>型にする
*/
static class SplitWordsAndMakeKVFn extends DoFn<String, KV<String, Integer>> {
@ProcessElement
// ProcessContextは、inputを表すobject
// 自分で定義しなくてもBeam SDKが勝手に取ってきてくれる
public void processElement(ProcessContext c) {
// ","で分割
String[] words = c.element().split(",");
// 分割したword[0]をKに、words[1]をIntegerに変換してVにする
c.output(KV.of(words[0], Integer.parseInt(words[1])));
}
}
/**
* 関数オブジェクト
* KV<String, Iterable<Integer>型をString型に変更する
*/
static class TransTypeFromKVAndMakeStringFn extends DoFn<KV<String, Integer>, String> {
@ProcessElement
public void processElement(ProcessContext c) {
// inputをString型に変換する
c.output(String.valueOf(c.element()));
}
}
/**
* インプットデータのパス
*/
private static final String INPUT_FILE_PATH = "./sample.txt";
/**
* アウトデータのパス
*/
private static final String COMBINE_OUTPUT_FILE_PATH = "./src/main/resources/combine_result/result.csv";
/**
* メイン
* 理解のため、メソッドチェーンを極力使用していない
* そのため、冗長なコードになっている
*
* @param args 引数
*/
public static void main(String[] args) {
// まずPipelineに設定するOptionを作成する
// 今回は、ローカルで起動するため、DirectRunnerを指定する
// ローカルモードでは、DirectRunnerがすでにデフォルトになっているため、ランナーを設定する必要はない
PipelineOptions options = PipelineOptionsFactory.create();
// Optionを元にPipelineを生成する
Pipeline pipeline = Pipeline.create(options);
// inout dataを読み込んで、そこからPCollection(パイプライン内の一連のデータ)を作成する
PCollection<String> lines = pipeline.apply(TextIO.read().from(INPUT_FILE_PATH));
// 与えられたString str, String numを","で分割し、numをInteger型に変更して、KV<String, Integer>型にする
PCollection<KV<String, Integer>> kvCounter = lines.apply(ParDo.of(new SplitWordsAndMakeKVFn()));
// Combine PerKey は、オペレーションの一部として GroupByKey 変換を実行する
PCollection<KV<String, Integer>> sumPerKey = kvCounter
.apply(Sum.integersPerKey());
// PCollectionをファイル出力可能な形に変換する
PCollection<String> output = sumPerKey.apply(ParDo.of(new TransTypeFromKVAndMakeStringFn()));
// 書き込む
output.apply(TextIO.write().to(COMBINE_OUTPUT_FILE_PATH));
// run : PipeLine optionで指定したRunnerで実行
// waitUntilFinish : PipeLineが終了するまで待って、最終的な状態を返す
pipeline.run().waitUntilFinish();
}
}
Java,1
Python,5
Go,1
Java,3
Java,2
Go,5
Python,2
Go,2
Go,9
Python,6
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment