Skip to content

Instantly share code, notes, and snippets.

@mesmat
mesmat / parseLines.java
Last active November 8, 2019 00:29
ParDo implementation to parse csv string lines, convert the valid ones into avro schema, and group the invalid / bad ones into another PCollection using MultiOutputReceiver
private static class ParseLines extends DoFn<String, GenericRecord> implements Serializable{
/**
*
*/
private static final long serialVersionUID = 4374710793511011723L;
private TupleTag<GenericRecord> goodRecordsTag;
private TupleTag<String> badRecordsTag;
private Map<String, String> sourceSchema;
private String targetSchemaJson;
private char sep;
@mesmat
mesmat / getEquivalentAvroSchema.java
Created November 8, 2019 00:15
An example on how to construct an avro schema from a map of key value pair of column name and type
private Schema getEquivalentAvroSchema(String tableName, Map<String, String> kvColumnNameType){
List<Schema.Field> fields = new ArrayList<Schema.Field>();
Iterator<String> it = kvColumnNameType.keySet().iterator();
// iterate over the columns and find the equivalent Avro type for each column
while(it.hasNext()){
String column = it.next();
String dataType = kvColumnNameType.get(column);
fields.add(new Schema.Field(column, getFieldSchema(dataType), null, (Object) null));
}
@mesmat
mesmat / ParseCSVData.java
Created November 7, 2019 22:21
Parsing csv strings, validate against schema, convert it to GenericRecords
// Create two TupleTag objects; one to tage good records and the other to tag bad records
final TupleTag<GenericRecord> goodRecordsTag = new TupleTag<GenericRecord>() {};
final TupleTag<String> badRecordsTag = new TupleTag<String>() {};
//instantiate sample source schema HashMap with key as the column name and value as the column type.
Map<String, String> sourceSchemaMap = new HashMap<String, String>() {{
put("column1", "string");
put("column2", "int");
put("column3", "double");
@mesmat
mesmat / readMultipleFiles.java
Last active November 7, 2019 03:46
Reading multiple files into one PCollection Object
// creating an example list of string patterns (regex) that match data files with
// csv extension and start with either '20190910' or '20190911'
ArrayList<String> filePatterns = new ArrayList<String>(
Arrays.asList(new String[]{"20190910*.csv.gz", "20190911*.csv.gz"}));
// in my example the csv files are gzip compressed, could be anything else in yours
Compression comp = Compression.GZIP;
// separator character
char sep = ',';
// create PCollection with the list of file patterns
PCollection<String> filePatternsPCollection = p.apply(Create.of(filePatterns));