Skip to content

Instantly share code, notes, and snippets.

@Dan-Dongcheol-Lee
Created May 24, 2018 11:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Dan-Dongcheol-Lee/680ce2145dca1f59e8dfb8750d2e567e to your computer and use it in GitHub Desktop.
Save Dan-Dongcheol-Lee/680ce2145dca1f59e8dfb8750d2e567e to your computer and use it in GitHub Desktop.
public class WordCount {
static class ExtractWordsFn extends DoFn<String, String> {
private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");
private final Distribution lineLenDist = Metrics.distribution(
ExtractWordsFn.class, "lineLenDistro");
@ProcessElement
public void processElement(ProcessContext c) {
lineLenDist.update(c.element().length());
if (c.element().trim().isEmpty()) {
emptyLines.inc();
}
String[] words = c.element().split(ExampleUtils.TOKENIZER_PATTERN);
for (String word : words) {
if (!word.isEmpty()) {
c.output(word);
}
}
}
}
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}
public static class CountWords extends PTransform<PCollection<String>,
PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
PCollection<String> words = lines.apply(
ParDo.of(new ExtractWordsFn()));
PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());
return wordCounts;
}
}
public interface WordCountOptions extends PipelineOptions {
@Description("Path of the file to read from")
@Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
String getInputFile();
void setInputFile(String value);
@Description("Path of the file to write to")
@Required
String getOutput();
void setOutput(String value);
}
static void runWordCount(WordCountOptions options) {
Pipeline p = Pipeline.create(options);
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply(new CountWords())
.apply(MapElements.via(new FormatAsTextFn()))
.apply("WriteCounts", TextIO.write().to(options.getOutput()));
p.run().waitUntilFinish();
}
public static void main(String[] args) {
WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(WordCountOptions.class);
runWordCount(options);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment