Skip to content

Instantly share code, notes, and snippets.

@derjust
Created September 13, 2015 14:40
Show Gist options
  • Save derjust/3fe190f4507ceb9a0c34 to your computer and use it in GitHub Desktop.
Save derjust/3fe190f4507ceb9a0c34 to your computer and use it in GitHub Desktop.
Simple side output
Pipeline p = Pipeline.create(options);
final TupleTag<String> output1 = new TupleTag<String>(){};
final TupleTag<String> output2 = new TupleTag<String>(){};
PCollectionTuple tuples = p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))
.apply(ParDo
.withOutputTags(output1, TupleTagList.of(output2))
.named("ExtractWords").of(new DoFn<String, String>() {
private static final long serialVersionUID = 0;
@Override
public void processElement(ProcessContext c) {
for (String word : c.element().split("[^a-zA-Z']+")) {
if (!word.isEmpty()) {
c.output(word);
} else {
c.sideOutput(output2, word);
}
}
}
}));
tuples.get(output1).apply(Count.<String>perElement())
.apply(ParDo.named("FormatResults").of(new DoFn<KV<String, Long>, String>() {
private static final long serialVersionUID = 0;
@Override
public void processElement(ProcessContext c) {
c.output(c.element().getKey() + ": " + c.element().getValue());
}
}))
.apply(TextIO.Write.to("gs://sj_test/df-output/"));
tuples.get(output2).apply(Count.<String>perElement())
.apply(ParDo.named("FormatResults2").of(new DoFn<KV<String, Long>, String>() {
private static final long serialVersionUID = 0;
@Override
public void processElement(ProcessContext c) {
c.output(c.element().getKey() + ": " + c.element().getValue());
}
}))
.apply(TextIO.Write.to("gs://sj_test/df-output/"));
p.run();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment