Skip to content

Instantly share code, notes, and snippets.

@reddikih
Created July 23, 2019 01:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save reddikih/dcff44c0b1154c7843d14c0bb1264d15 to your computer and use it in GitHub Desktop.
Save reddikih/dcff44c0b1154c7843d14c0bb1264d15 to your computer and use it in GitHub Desktop.
Apache Beam ValueProvider usage sample from GCP document (https://cloud.google.com/dataflow/docs/guides/templates/creating-templates?hl=ja)
package dummy;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.TypeDescriptors;
/**
* Usage: ./gradlew run --args="--runner=DirectRunner --int=10"
*
* This program requires that this class is specified main class via
* `application` plugin setting of gradle.build
*/
public class SumInteger {
public interface SumIntOptions extends PipelineOptions {
ValueProvider<Integer> getInt();
void setInt(ValueProvider<Integer> value);
}
static class MySumFn extends DoFn<Integer, Integer> {
ValueProvider<Integer> mySumInteger;
MySumFn(ValueProvider<Integer> sumInt) {
this.mySumInteger = sumInt;
}
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element() + mySumInteger.get());
}
}
public static void main(String... args) {
SumIntOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(SumIntOptions.class);
Pipeline p = Pipeline.create(options);
p.apply(Create.of(1,2,3))
.apply(ParDo.of(new MySumFn(options.getInt())))
.apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString()))
.apply("OutputNums", TextIO.write().to("numvalues"));
p.run();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment