-
-
Save gabihodoroaga/9538a6fb13aa0bcee3d1e1cb1e66b7b9 to your computer and use it in GitHub Desktop.
WordCount GCS with custom output file name
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.beam.sdk.Pipeline; | |
import org.apache.beam.sdk.coders.StringUtf8Coder; | |
import org.apache.beam.sdk.io.FileIO; | |
import org.apache.beam.sdk.io.TextIO; | |
import org.apache.beam.sdk.options.PipelineOptions; | |
import org.apache.beam.sdk.options.PipelineOptionsFactory; | |
import org.apache.beam.sdk.transforms.*; | |
import org.apache.beam.sdk.values.KV; | |
import org.apache.beam.sdk.values.TypeDescriptors; | |
import java.util.Arrays; | |
public class WriteFileWithName { | |
public static void main(String[] args) { | |
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class); | |
Pipeline p = Pipeline.create(options); | |
p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*")) | |
// Concept #2: Apply a FlatMapElements transform the PCollection of text lines. | |
// This transform splits the lines in PCollection<String>, where each element is an | |
// individual word in Shakespeare's collected texts. | |
.apply( | |
FlatMapElements.into(TypeDescriptors.strings()) | |
.via((String line) -> Arrays.asList(line.split("[^\\p{L}]+")))) | |
// We use a Filter transform to avoid empty word | |
.apply(Filter.by((String word) -> !word.isEmpty())) | |
// Concept #3: Apply the Count transform to our PCollection of individual words. The Count | |
// transform returns a new PCollection of key/value pairs, where each key represents a | |
// unique word in the text. The associated value is the occurrence count for that word. | |
.apply(Count.perElement()) | |
// Apply a MapElements transform that formats our PCollection of word counts into a | |
// printable string, suitable for writing to an output file. | |
.apply( | |
MapElements.into(TypeDescriptors.strings()) | |
.via( | |
(KV<String, Long> wordCount) -> | |
wordCount.getKey() + ": " + wordCount.getValue())) | |
// Concept #4: Apply a write transform, FileIO.writeDynamic, at the end of the pipeline. | |
// FileIO.writeDynamic writes the contents of a PCollection (in this case, our PCollection of | |
// formatted strings) to a series of text files in GCS. | |
// | |
// By default, it will write to a set of files with names like <prefix>>-00001-of-00005<suffix> | |
// mentioned in withNaming method, defaultNaming accepts two arguments `prefix` and `suffix` | |
.apply(FileIO.<String, String>writeDynamic() | |
// specifies how the elements should be distributed, since all elements need to be considered | |
// equally, the elements are distributed on name of input class, which is same for all input | |
.by((SerializableFunction<String, String>) input -> input.getClass().toString()) | |
.to("<GCS path URI>") | |
.withNumShards(1) | |
.via(TextIO.sink()) | |
.withDestinationCoder(StringUtf8Coder.of()) | |
// @arguments(String, String), prefix and suffix, prefix can be dynamic based on some rule | |
.withNaming(key -> FileIO.Write.defaultNaming("temp", ".txt")) | |
); | |
p.run(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment