Skip to content

Instantly share code, notes, and snippets.

@nfisher
Created September 19, 2018 14:29
Show Gist options
  • Save nfisher/6a3c80f2128d28e39b2c7bd5d1a7fbbd to your computer and use it in GitHub Desktop.
Save nfisher/6a3c80f2128d28e39b2c7bd5d1a7fbbd to your computer and use it in GitHub Desktop.
Dataflow primitive operations
// .apply("Extract Body", ParDo.of(new ExtractMessage()))
class ExtractMessage extends DoFn<PubsubMessage, String> {
@ProcessElement
public void processElement(final ProcessContext c) {
final PubsubMessage msg = c.element();
final String s = new String(msg.getPayload());
c.output(s);
}
}
String targetPath "gs:bucket/subfolder/"
p.apply(
"Write Body",
TextIO.write()
.to(targetPath + "data-enriched-")
.withWindowedWrites()
.withNumShards(10)
.withSuffix(".out"));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment