Created
April 16, 2015 18:33
-
-
Save ldcasillas-progreso/6ae40740c9e2b7dc1532 to your computer and use it in GitHub Desktop.
An OpenCSV-based DelimitedParser for Cascading that we've used successfully to interface with Redshift. I do not vouch for correctness under all circumstances...
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.progressfin.cascading.util.scheme; | |
import au.com.bytecode.opencsv.CSVParser; | |
import cascading.scheme.util.DelimitedParser; | |
import cascading.scheme.util.FieldTypeResolver; | |
import cascading.tap.TapException; | |
import cascading.tuple.Fields; | |
import cascading.tuple.Tuple; | |
import cascading.util.Util; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.io.IOException; | |
import java.io.Serializable; | |
import java.util.Arrays; | |
/** | |
* A hack on top of Cascading's {@link cascading.scheme.util.DelimitedParser} to correct what I call a bug in it, or at | |
* least an incompatibility with Redshift. If a field value contains a quote but no delimiters, the default | |
* {@class DelimitedParser} will write the field like the middle one in this example: | |
* | |
* <p>{@code some field,a field with "" in it, yet another field} | |
* | |
* <p>Redshift chokes on this, and expects this output: | |
* | |
* <p>{@code some field,"a field with "" in it", yet another field} | |
* | |
* @author Luis Casillas | |
*/ | |
public class RedshiftDelimitedParser extends DelimitedParser { | |
private static final Logger log = LoggerFactory.getLogger(RedshiftDelimitedParser.class); | |
private final CSVParserHolder csvParserHolder; | |
public RedshiftDelimitedParser(char delimiter, char quote, Class[] types) { | |
super(String.valueOf(delimiter), String.valueOf(quote), types); | |
csvParserHolder = new CSVParserHolder(delimiter, quote); | |
} | |
public RedshiftDelimitedParser(char delimiter, char quote, Class[] types, boolean strict, boolean safe) { | |
super(String.valueOf(delimiter), String.valueOf(quote), types, strict, safe); | |
csvParserHolder = new CSVParserHolder(delimiter, quote); | |
} | |
public RedshiftDelimitedParser(char delimiter, char quote, FieldTypeResolver fieldTypeResolver) { | |
super(String.valueOf(delimiter), String.valueOf(quote), fieldTypeResolver); | |
csvParserHolder = new CSVParserHolder(delimiter, quote); | |
} | |
public RedshiftDelimitedParser(char delimiter, char quote, Class[] types, boolean strict, boolean safe, | |
FieldTypeResolver fieldTypeResolver) { | |
super(String.valueOf(delimiter), String.valueOf(quote), types, strict, safe, fieldTypeResolver); | |
csvParserHolder = new CSVParserHolder(delimiter, quote); | |
} | |
public RedshiftDelimitedParser(char delimiter, char quote, Class[] types, boolean strict, boolean safe, | |
Fields sourceFields, Fields sinkFields) { | |
super(String.valueOf(delimiter), String.valueOf(quote), types, strict, safe, sourceFields, sinkFields); | |
csvParserHolder = new CSVParserHolder(delimiter, quote); | |
} | |
public RedshiftDelimitedParser(char delimiter, char quote, Class[] types, boolean strict, boolean safe, | |
Fields sourceFields, Fields sinkFields, FieldTypeResolver fieldTypeResolver) { | |
super(String.valueOf(delimiter), String.valueOf(quote), types, strict, safe, | |
sourceFields, sinkFields, fieldTypeResolver); | |
csvParserHolder = new CSVParserHolder(delimiter, quote); | |
} | |
@Override | |
protected Object[] onlyParseLine(String line) { | |
Object[] split = new String[0]; | |
try { | |
split = csvParserHolder.getCSVParser().parseLine(line); | |
} catch (IOException ioe) { | |
if( enforceStrict ) { | |
String msg = "Could not parse CSV line: %s"; | |
throw new TapException(String.format(msg, line), ioe, new Tuple(line)); // trap actual line data | |
} | |
} | |
// Everything below here is just copy-paste from superclass. Grrrr... | |
if( numValues != 0 && split.length != numValues ) | |
{ | |
if( enforceStrict ) | |
throw new TapException( getParseMessage( split ), new Tuple( line ) ); // trap actual line data | |
if( log.isDebugEnabled() ) | |
log.debug( getParseMessage( split ) ); | |
Object[] array = new Object[ numValues ]; | |
Arrays.fill(array, ""); | |
System.arraycopy( split, 0, array, 0, Math.min( numValues, split.length ) ); | |
split = array; | |
} | |
return split; | |
} | |
// More copypaste from superclass | |
private String getParseMessage( Object[] split ) | |
{ | |
return "did not parse correct number of values from input data, expected: " + numValues + ", got: " + split.length + ":" + Util.join(",", (String[]) split); | |
} | |
@Override | |
protected Appendable joinWithQuote( Iterable tuple, Appendable buffer ) throws IOException | |
{ | |
int count = 0; | |
for( Object value : tuple ) | |
{ | |
if( count != 0 ) | |
buffer.append( delimiter ); | |
if( value != null ) | |
{ | |
String valueString = value.toString(); | |
if( valueString.contains( quote ) ) | |
{ | |
valueString = quote + valueString.replaceAll(quote, quote + quote) + quote; | |
} else if( valueString.contains( delimiter ) ) | |
{ | |
valueString = quote + valueString + quote; | |
} | |
buffer.append( valueString ); | |
} | |
count++; | |
} | |
return buffer; | |
} | |
/** | |
* Serializable container for the CSVParser. | |
*/ | |
private static class CSVParserHolder implements Serializable { | |
private final char separator; | |
private final char quotechar; | |
// HACK: Since TextLine's going to split on newlines anyway, we're guaranteed that no individual | |
// line we ever see has a newline. OpenCSV doesn't allow us to disable the escape character, and | |
// this is our way of faking it. | |
private final char escape = '\n'; | |
private final boolean strictQuotes = false; | |
private final boolean ignoreLeadingWhiteSpace = false; | |
private transient volatile CSVParser csvParser; | |
public CSVParserHolder(char separator, char quotechar) { | |
this.separator = separator; | |
this.quotechar = quotechar; | |
this.csvParser = new CSVParser(separator, quotechar, escape, strictQuotes, ignoreLeadingWhiteSpace); | |
} | |
public CSVParser getCSVParser() { | |
if (csvParser == null) { | |
this.csvParser = new CSVParser(separator, quotechar, escape, strictQuotes, ignoreLeadingWhiteSpace); | |
} | |
return csvParser; | |
} | |
} | |
/** | |
* Construct a {@link com.progressfin.cascading.util.scheme.RedshiftDelimitedParser} with the default options. | |
* This means double quote ({@code "}) as the quote character, and comma as the delimiter. | |
*/ | |
public static RedshiftDelimitedParser standard(Fields fields) { | |
return new RedshiftDelimitedParser(',', '"', null, true, true, fields, fields); | |
} | |
public static class Builder { | |
private char delimiter; | |
private char quote; | |
private Fields fields; | |
private boolean safe = true; | |
private boolean strict = true; | |
public Builder setDelimiter(char delimiter) { | |
this.delimiter = delimiter; | |
return this; | |
} | |
public Builder setQuote(char quote) { | |
this.quote = quote; | |
return this; | |
} | |
public Builder setFields(Fields fields) { | |
this.fields = fields; | |
return this; | |
} | |
public Builder setSafe(boolean safe) { | |
this.safe = safe; | |
return this; | |
} | |
public Builder setStrict(boolean strict) { | |
this.strict = strict; | |
return this; | |
} | |
public RedshiftDelimitedParser build() { | |
return new RedshiftDelimitedParser(delimiter, quote, null, strict, safe, fields, fields); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment