Skip to content

Instantly share code, notes, and snippets.

@pbrumblay pbrumblay/CSVSplitter.java Secret
Last active Apr 24, 2018

Embed
What would you like to do?
Splittable DoFn implementation of a CSV Transformation
package com.fearlesstg.dataflow;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class CSVSplitter extends DoFn<FileIO.ReadableFile, List<String>> {
private static final Logger LOG = LoggerFactory.getLogger(CSVSplitter.class);
private static final byte LF_CHAR = 0x0a;
@ProcessElement
@SuppressWarnings("unused")
public void process(ProcessContext c, OffsetRangeTracker tracker) {
try {
FileIO.ReadableFile file = c.element();
LOG.info("File: " + file.getMetadata().resourceId().getFilename());
LOG.info("Processing current restriction: " + tracker.currentRestriction());
long from = tracker.currentRestriction().getFrom();
long to = tracker.currentRestriction().getTo();
int capacity = (int)(to - from);
LOG.info("Capacity: " + capacity);
ByteBuffer bb = ByteBuffer.allocate(capacity);
int bytesRead = 0;
try (SeekableByteChannel seekable = file.openSeekable()) {
LOG.info("Seeking to start of restriction: " + from);
seekable.position(from);
while (bytesRead < capacity) {
bytesRead += seekable.read(bb);
}
}
String fileString = new String(bb.array(), StandardCharsets.UTF_8);
CSVParser parser = CSVParser.parse(fileString, CSVFormat.RFC4180);
long end = to - 1;
if(end < from) end = from;
if(!tracker.tryClaim(end)) {
LOG.info("NOT Claimed! " + tracker.currentRestriction());
return;
} else {
LOG.info("Claimed! " + tracker.currentRestriction());
}
for(CSVRecord record : parser) {
List<String> list = new ArrayList<>();
record.forEach(list::add);
c.output(list);
}
} catch(Exception e) {
LOG.info("Error in process loop", e);
throw new RuntimeException(e);
}
}
@GetInitialRestriction
@SuppressWarnings("unused")
public OffsetRange getInitialRestriction(FileIO.ReadableFile file) {
long fileSize = file.getMetadata().sizeBytes();
LOG.info("======== INITIAL RESTRICTION file size: " + fileSize);
return new OffsetRange(0, fileSize);
}
@SplitRestriction
@SuppressWarnings("unused")
public void splitRestriction(FileIO.ReadableFile file, OffsetRange restriction, DoFn.OutputReceiver<OffsetRange> receiver) {
LOG.info("======== SPLIT SPLIT SPLIT SPLIT SPLIT SPLIT =======");
long diff = restriction.getTo() - restriction.getFrom();
LOG.info("Difference: " + diff);
List<OffsetRange> outputRanges = new ArrayList<>();
try {
if (diff > 1024 * 1024 * 4) { //4mb
if (diff > 1024 * 1024 * 20) { //20mb
//split into 10mb chunks
LOG.info("Splitting into roughly 10mb chunks.");
long chunk = 1024 * 1024 * 20; //20mb /2 = 10mb
long from = restriction.getFrom();
long to = from + chunk;
while (true) {
if (to > restriction.getTo()) {
to = restriction.getTo();
}
OffsetRange segment = new OffsetRange(from, to);
List<OffsetRange> ranges = findSplit(file, segment);
outputRanges.add(ranges.get(0));
if (to == restriction.getTo()) {
outputRanges.add(ranges.get(1));
break;
}
from = ranges.get(1).getFrom();
to = from + chunk;
}
} else {
//split into 4 chunks, as close to evenly as possible
LOG.info("Splitting into 4 even chunks.");
for (OffsetRange half : findSplit(file, restriction)) {
outputRanges.addAll(findSplit(file, half));
}
}
StringBuilder sb = new StringBuilder();
for(OffsetRange r : outputRanges) {
sb.append(r);
sb.append("\n");
receiver.output(r);
}
LOG.info("Restriction ranges: \n" + sb.toString());
}
} catch(Exception e) {
LOG.info("Error finding splits", e);
throw new RuntimeException(e);
}
//else don't split, too small.
}
/** Splits file as evenly as possible */
private List<OffsetRange> findSplit(FileIO.ReadableFile file, OffsetRange range) throws IOException {
List<OffsetRange> ranges = new ArrayList<>();
LOG.info("Find split " + range);
long midpoint = range.getFrom() + (range.getTo() - range.getFrom()) / 2;
ByteBuffer bb = ByteBuffer.allocate(1024 * 100); //100k
try (SeekableByteChannel seekable = file.openSeekable()) {
LOG.info("Seeking to midpoint: " + midpoint);
seekable.position(midpoint);
int bytesRead = 0;
//get at least 10k
while(bytesRead < 1024 * 10) {
bytesRead += seekable.read(bb);
}
bb.limit(bytesRead);
bb.position(0);
while(bb.position() < bb.limit()) {
byte b = bb.get();
if(b == LF_CHAR && bb.position() < bb.limit()) {
byte[] fileBytes = Arrays.copyOfRange(bb.array(), bb.position(), bb.limit());
String streamChars = new String(fileBytes, StandardCharsets.UTF_8);
try {
CSVParser parser = CSVParser.parse(streamChars, CSVFormat.RFC4180);
int numParsed = 0;
//if we parse 3 in a row, indicate we've found our split point.
for(CSVRecord record : parser) {
numParsed++;
if(numParsed > 3) {
break;
}
}
if(numParsed > 3) {
ranges.add(new OffsetRange(range.getFrom(), midpoint + bb.position()));
ranges.add(new OffsetRange(midpoint + bb.position(), range.getTo()));
LOG.info("Splitting range " + range + " into " + ranges.get(0) + " and " + ranges.get(1));
break;
}
} catch(Exception e) {
LOG.info("Exception parsing string - advancing pointer.", e);
}
}
}
}
return ranges;
}
}
@pbrumblay

This comment has been minimized.

Copy link
Owner Author

commented Apr 24, 2018

Environment

  • Apache Beam Java 2.4.0
  • DirectRunner
  • DataflowRunner (eventually; splittable DoFn not currently supported)

Goals

Trying to achieve two goals here simultaneously:

  1. Support arbitrarily large CSV RFC4180 files. (300+ mb)
  2. Support embedded newlines in quoted fields. (TextIO.Read splits on newlines; embedded newlines are a problem).

Questions

  1. Am I right that SplittableDoFn is the way to go here?
  2. I'm not iterating over the restriction in processElement() as in the examples, instead relying on splitRestriction() to partition my work. Is this contrary to the design? If so, how should I split it in a design-friendly way while dealing with unknown record sizes and embedded newlines?
  3. When used with the direct runner, the last byte in any range is attempted by itself. E.g. [0,29233) - a worker will process the range, and then another will process [29233, 29233) and fail when it tries to claim this range. This feels like an off-by-one error but I can't see what I'm doing wrong to cause it.
  4. I'm not a java expert. Is there anything wrong in this code which will impact core design assumptions in Apache Beam?

Related info

  1. Alternative for smaller files
  2. Related Jira Issue
  3. Splittable DoFn announcement
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.