Skip to content

Instantly share code, notes, and snippets.

@techisbeautiful
Last active July 12, 2022 20:24
Show Gist options
  • Save techisbeautiful/c323cf410ffa41b51368f73d87197068 to your computer and use it in GitHub Desktop.
Save techisbeautiful/c323cf410ffa41b51368f73d87197068 to your computer and use it in GitHub Desktop.
PCollection<String> productCollection =
pipeline.apply(TextIO.read().from("/data/section1/products.csv"))
.apply("FilterHeader", Filter.by(line ->
!line.isEmpty() && !line.contains("ProductId, ProductName, ProductTypeId, Price"))
);
PCollection<KV<Integer, String>> productTypeCollection =
pipeline.apply(TextIO.read().from(productTypes))
.apply("FilterHeader", Filter.by(line ->
!line.isEmpty() && !line.contains("ProductTypeId, ProductType")))
.apply(ParDo.of(new DoFn<String, KV<Integer, String>>() {
@ProcessElement
public void processElement(ProcessContext c) {
String row = c.element();
assert row != null;
String[] splits = row.split(",");
c.output(KV.of(Integer.valueOf(splits[0]), String.join(",",
splits[0], splits[1])));
}
}));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment