Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
public class WordCount {
static class ExtractWordsFn extends DoFn<String, String> {
private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");
private final Distribution lineLenDist = Metrics.distribution(
ExtractWordsFn.class, "lineLenDistro");
public void processElement(ProcessContext c) {
if (c.element().trim().isEmpty()) {;
String[] words = c.element().split(ExampleUtils.TOKENIZER_PATTERN);
for (String word : words) {
if (!word.isEmpty()) {
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
public static class CountWords extends PTransform<PCollection<String>,
PCollection<KV<String, Long>>> {
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
PCollection<String> words = lines.apply(
ParDo.of(new ExtractWordsFn()));
PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());
return wordCounts;
public interface WordCountOptions extends PipelineOptions {
@Description("Path of the file to read from")
String getInputFile();
void setInputFile(String value);
@Description("Path of the file to write to")
String getOutput();
void setOutput(String value);
static void runWordCount(WordCountOptions options) {
Pipeline p = Pipeline.create(options);
.apply(new CountWords())
.apply(MapElements.via(new FormatAsTextFn()))
.apply("WriteCounts", TextIO.write().to(options.getOutput()));;
public static void main(String[] args) {
WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.