Skip to content

Instantly share code, notes, and snippets.

@gxercavins
Last active December 9, 2019 19:43
Show Gist options
  • Save gxercavins/960bc3b9ad38ad513b412f40610dd463 to your computer and use it in GitHub Desktop.
Save gxercavins/960bc3b9ad38ad513b412f40610dd463 to your computer and use it in GitHub Desktop.
SO question 59219937
  • Example on message published
$ gcloud pubsub topics publish streamdemo --message Hi --attribute evId=1234
  • Pipeline log
INFO: Event ID: 1234
package com.dataflow.samples;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.joda.time.Duration;
import org.joda.time.Instant;
import java.util.concurrent.TimeUnit;
import java.util.Date;
import java.sql.Timestamp;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
public class PubSubGetAttribute {
private static final Logger LOG = LoggerFactory.getLogger(PubSubGetAttribute.class);
public static interface MyOptions extends PipelineOptions {
@Description("Input topic")
String getInput();
void setInput(String s);
}
@SuppressWarnings("serial")
public static void main(String[] args) {
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
Pipeline p = Pipeline.create(options);
String topic = options.getInput();
p
.apply("Read Messages", PubsubIO.readMessagesWithAttributes().fromTopic(topic))
.apply("Log Event ID", ParDo.of(new DoFn<PubsubMessage, String>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
try {
String event = c.element().getAttribute("evId");
LOG.info("Event ID: " + event);
c.output(event);
}
catch(Exception e) { }
}
}));
p.run();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment