This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private static class ParseLines extends DoFn<String, GenericRecord> implements Serializable{ | |
/** | |
* | |
*/ | |
private static final long serialVersionUID = 4374710793511011723L; | |
private TupleTag<GenericRecord> goodRecordsTag; | |
private TupleTag<String> badRecordsTag; | |
private Map<String, String> sourceSchema; | |
private String targetSchemaJson; | |
private char sep; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private Schema getEquivalentAvroSchema(String tableName, Map<String, String> kvColumnNameType){ | |
List<Schema.Field> fields = new ArrayList<Schema.Field>(); | |
Iterator<String> it = kvColumnNameType.keySet().iterator(); | |
// iterate over the columns and find the equivalent Avro type for each column | |
while(it.hasNext()){ | |
String column = it.next(); | |
String dataType = kvColumnNameType.get(column); | |
fields.add(new Schema.Field(column, getFieldSchema(dataType), null, (Object) null)); | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Create two TupleTag objects; one to tage good records and the other to tag bad records | |
final TupleTag<GenericRecord> goodRecordsTag = new TupleTag<GenericRecord>() {}; | |
final TupleTag<String> badRecordsTag = new TupleTag<String>() {}; | |
//instantiate sample source schema HashMap with key as the column name and value as the column type. | |
Map<String, String> sourceSchemaMap = new HashMap<String, String>() {{ | |
put("column1", "string"); | |
put("column2", "int"); | |
put("column3", "double"); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// creating an example list of string patterns (regex) that match data files with | |
// csv extension and start with either '20190910' or '20190911' | |
ArrayList<String> filePatterns = new ArrayList<String>( | |
Arrays.asList(new String[]{"20190910*.csv.gz", "20190911*.csv.gz"})); | |
// in my example the csv files are gzip compressed, could be anything else in yours | |
Compression comp = Compression.GZIP; | |
// separator character | |
char sep = ','; | |
// create PCollection with the list of file patterns | |
PCollection<String> filePatternsPCollection = p.apply(Create.of(filePatterns)); |