Skip to content

Instantly share code, notes, and snippets.

@lidalei
Created March 7, 2018 09:39
Show Gist options
  • Save lidalei/493c72a795166a6ff7e08416f7f303e2 to your computer and use it in GitHub Desktop.
Save lidalei/493c72a795166a6ff7e08416f7f303e2 to your computer and use it in GitHub Desktop.
Apache Beam pipeline having a transform with two side inputs cannot be updated in Gcloud Dataflow
import com.google.common.collect.Iterables;
import com.google.common.primitives.Ints;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PipelineTest {
private static final Logger logger = LoggerFactory.getLogger(PipelineTest.class);
public static void main(String[] args) {
int[] shit = new int[1000];
for (int i = 0; i < shit.length; i++) {
shit[i] = i * i;
}
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);
PCollection<Iterable<Integer>> sideInput1 =
pipeline.apply("Create1", Create.<Iterable<Integer>>of(Ints.asList(shit)));
PCollectionView<Iterable<Integer>> view1 =
sideInput1.apply("CreateSideInput1", View.asSingleton());
PCollection<Iterable<Integer>> sideInput2 =
pipeline.apply("Create2", Create.<Iterable<Integer>>of(Ints.asList(shit)));
PCollectionView<Iterable<Integer>> view2 =
sideInput2.apply("CreateSideInput2", View.asSingleton());
PCollection<String> done =
pipeline
.apply(
"FakeData",
GenerateSequence.from(0).to(50_000).withRate(10, Duration.standardSeconds(1)))
.apply(
"Map1",
ParDo.of(
new DoFn<Long, String>() {
@ProcessElement
public void processElement(ProcessContext ctx) {
Long element = ctx.element();
Iterable<Integer> v1 = ctx.sideInput(view1);
Iterable<Integer> v2 = ctx.sideInput(view2);
String out =
"element "
+ element
+ ", v1 size "
+ Iterables.size(v1)
+ ", v2 size "
+ Iterables.size(v2);
logger.info("MAP1: " + out);
ctx.output(out);
}
})
.withSideInputs(view1, view2));
pipeline.run();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment