Skip to content

Instantly share code, notes, and snippets.

@gxercavins
Created July 16, 2019 19:44
Show Gist options
  • Save gxercavins/74739b31e8317b084f561fcb5097e5f1 to your computer and use it in GitHub Desktop.
Save gxercavins/74739b31e8317b084f561fcb5097e5f1 to your computer and use it in GitHub Desktop.
SO question 57053573
package com.dataflow.samples;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class MultipleTopics {
private static final Logger Log = LoggerFactory.getLogger(MultipleTopics.class);
public interface Options extends PipelineOptions {
@Description("List of comma-separated Pub/Sub topics")
String getTopicList();
void setTopicList(String s);
}
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
String[] listOfTopicStr = options.getTopicList().split(",");
Pipeline pipeline = Pipeline.create(options);
PCollection[] p = new PCollection[listOfTopicStr.length];
for (int i = 0; i < listOfTopicStr.length; i++) {
p[i] = pipeline
.apply(PubsubIO.readStrings().fromTopic(listOfTopicStr[i]))
.apply(ParDo.of(new DoFn<String, Void>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
Log.info(String.format("Message=%s", c.element()));
}
}));
}
pipeline.run().waitUntilFinish();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment