Skip to content

Instantly share code, notes, and snippets.

@ldcasillas-progreso
Created April 16, 2015 18:33
Show Gist options
  • Save ldcasillas-progreso/6ae40740c9e2b7dc1532 to your computer and use it in GitHub Desktop.
Save ldcasillas-progreso/6ae40740c9e2b7dc1532 to your computer and use it in GitHub Desktop.
An OpenCSV-based DelimitedParser for Cascading that we've used successfully to interface with Redshift. I do not vouch for correctness under all circumstances...
package com.progressfin.cascading.util.scheme;
import au.com.bytecode.opencsv.CSVParser;
import cascading.scheme.util.DelimitedParser;
import cascading.scheme.util.FieldTypeResolver;
import cascading.tap.TapException;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
/**
* A hack on top of Cascading's {@link cascading.scheme.util.DelimitedParser} to correct what I call a bug in it, or at
* least an incompatibility with Redshift. If a field value contains a quote but no delimiters, the default
* {@class DelimitedParser} will write the field like the middle one in this example:
*
* <p>{@code some field,a field with "" in it, yet another field}
*
* <p>Redshift chokes on this, and expects this output:
*
* <p>{@code some field,"a field with "" in it", yet another field}
*
* @author Luis Casillas
*/
public class RedshiftDelimitedParser extends DelimitedParser {
private static final Logger log = LoggerFactory.getLogger(RedshiftDelimitedParser.class);
private final CSVParserHolder csvParserHolder;
public RedshiftDelimitedParser(char delimiter, char quote, Class[] types) {
super(String.valueOf(delimiter), String.valueOf(quote), types);
csvParserHolder = new CSVParserHolder(delimiter, quote);
}
public RedshiftDelimitedParser(char delimiter, char quote, Class[] types, boolean strict, boolean safe) {
super(String.valueOf(delimiter), String.valueOf(quote), types, strict, safe);
csvParserHolder = new CSVParserHolder(delimiter, quote);
}
public RedshiftDelimitedParser(char delimiter, char quote, FieldTypeResolver fieldTypeResolver) {
super(String.valueOf(delimiter), String.valueOf(quote), fieldTypeResolver);
csvParserHolder = new CSVParserHolder(delimiter, quote);
}
public RedshiftDelimitedParser(char delimiter, char quote, Class[] types, boolean strict, boolean safe,
FieldTypeResolver fieldTypeResolver) {
super(String.valueOf(delimiter), String.valueOf(quote), types, strict, safe, fieldTypeResolver);
csvParserHolder = new CSVParserHolder(delimiter, quote);
}
public RedshiftDelimitedParser(char delimiter, char quote, Class[] types, boolean strict, boolean safe,
Fields sourceFields, Fields sinkFields) {
super(String.valueOf(delimiter), String.valueOf(quote), types, strict, safe, sourceFields, sinkFields);
csvParserHolder = new CSVParserHolder(delimiter, quote);
}
public RedshiftDelimitedParser(char delimiter, char quote, Class[] types, boolean strict, boolean safe,
Fields sourceFields, Fields sinkFields, FieldTypeResolver fieldTypeResolver) {
super(String.valueOf(delimiter), String.valueOf(quote), types, strict, safe,
sourceFields, sinkFields, fieldTypeResolver);
csvParserHolder = new CSVParserHolder(delimiter, quote);
}
@Override
protected Object[] onlyParseLine(String line) {
Object[] split = new String[0];
try {
split = csvParserHolder.getCSVParser().parseLine(line);
} catch (IOException ioe) {
if( enforceStrict ) {
String msg = "Could not parse CSV line: %s";
throw new TapException(String.format(msg, line), ioe, new Tuple(line)); // trap actual line data
}
}
// Everything below here is just copy-paste from superclass. Grrrr...
if( numValues != 0 && split.length != numValues )
{
if( enforceStrict )
throw new TapException( getParseMessage( split ), new Tuple( line ) ); // trap actual line data
if( log.isDebugEnabled() )
log.debug( getParseMessage( split ) );
Object[] array = new Object[ numValues ];
Arrays.fill(array, "");
System.arraycopy( split, 0, array, 0, Math.min( numValues, split.length ) );
split = array;
}
return split;
}
// More copypaste from superclass
private String getParseMessage( Object[] split )
{
return "did not parse correct number of values from input data, expected: " + numValues + ", got: " + split.length + ":" + Util.join(",", (String[]) split);
}
@Override
protected Appendable joinWithQuote( Iterable tuple, Appendable buffer ) throws IOException
{
int count = 0;
for( Object value : tuple )
{
if( count != 0 )
buffer.append( delimiter );
if( value != null )
{
String valueString = value.toString();
if( valueString.contains( quote ) )
{
valueString = quote + valueString.replaceAll(quote, quote + quote) + quote;
} else if( valueString.contains( delimiter ) )
{
valueString = quote + valueString + quote;
}
buffer.append( valueString );
}
count++;
}
return buffer;
}
/**
* Serializable container for the CSVParser.
*/
private static class CSVParserHolder implements Serializable {
private final char separator;
private final char quotechar;
// HACK: Since TextLine's going to split on newlines anyway, we're guaranteed that no individual
// line we ever see has a newline. OpenCSV doesn't allow us to disable the escape character, and
// this is our way of faking it.
private final char escape = '\n';
private final boolean strictQuotes = false;
private final boolean ignoreLeadingWhiteSpace = false;
private transient volatile CSVParser csvParser;
public CSVParserHolder(char separator, char quotechar) {
this.separator = separator;
this.quotechar = quotechar;
this.csvParser = new CSVParser(separator, quotechar, escape, strictQuotes, ignoreLeadingWhiteSpace);
}
public CSVParser getCSVParser() {
if (csvParser == null) {
this.csvParser = new CSVParser(separator, quotechar, escape, strictQuotes, ignoreLeadingWhiteSpace);
}
return csvParser;
}
}
/**
* Construct a {@link com.progressfin.cascading.util.scheme.RedshiftDelimitedParser} with the default options.
* This means double quote ({@code "}) as the quote character, and comma as the delimiter.
*/
public static RedshiftDelimitedParser standard(Fields fields) {
return new RedshiftDelimitedParser(',', '"', null, true, true, fields, fields);
}
public static class Builder {
private char delimiter;
private char quote;
private Fields fields;
private boolean safe = true;
private boolean strict = true;
public Builder setDelimiter(char delimiter) {
this.delimiter = delimiter;
return this;
}
public Builder setQuote(char quote) {
this.quote = quote;
return this;
}
public Builder setFields(Fields fields) {
this.fields = fields;
return this;
}
public Builder setSafe(boolean safe) {
this.safe = safe;
return this;
}
public Builder setStrict(boolean strict) {
this.strict = strict;
return this;
}
public RedshiftDelimitedParser build() {
return new RedshiftDelimitedParser(delimiter, quote, null, strict, safe, fields, fields);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment