Skip to content

Instantly share code, notes, and snippets.

@howarddierking
Created December 6, 2017 17:20
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save howarddierking/3184c7d716de8fbb12389d120150024a to your computer and use it in GitHub Desktop.
Save howarddierking/3184c7d716de8fbb12389d120150024a to your computer and use it in GitHub Desktop.
Sample ParDo transformation for shredding CSV files into a map
package com.howarddierking.demo;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.apache.commons.io.FilenameUtils;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
public class CsvShredder extends DoFn<String, Map<String, String>> {
@ProcessElement
public void processElement(ProcessContext context){
try {
String filePath = context.element();
// it appears from https://stackoverflow.com/questions/33052578/temp-files-in-google-cloud-dataflow that
// writing temp files as shown here is appropriate for Cloud Dataflow
ResourceId inputResourceId = FileSystems.matchNewResource(filePath, false);
ReadableByteChannel inputChannel = FileSystems.open(inputResourceId);
File outputFile = File.createTempFile(UUID.randomUUID().toString(),".tmp");
FileOutputStream outStream = new FileOutputStream(outputFile);
FileChannel outChannel = outStream.getChannel();
outChannel.transferFrom(inputChannel, 0, Integer.MAX_VALUE);
inputChannel.close();
outChannel.close();
CSVParser parser = CSVParser.parse(outputFile, Charset.forName("UTF-8"), CSVFormat.EXCEL.withHeader());
Map<String, Integer> headerMap = parser.getHeaderMap();
for (CSVRecord record : parser) {
Map<String, String> values = new HashMap<>();
values.put("InputFile", FilenameUtils.removeExtension(FilenameUtils.getName(filePath)));
for (String headerValue : headerMap.keySet()) {
values.put(headerValue, record.get(headerValue));
}
context.output(values);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment