Skip to content

Instantly share code, notes, and snippets.

@parisni
Last active April 3, 2023 08:05
Show Gist options
  • Save parisni/7855c59d43e586260167ccf9ba29fe10 to your computer and use it in GitHub Desktop.
Save parisni/7855c59d43e586260167ccf9ba29fe10 to your computer and use it in GitHub Desktop.
Hudi custom merger
public class CustomHoodieSparkRecordMergert implements HoodieRecordMerger {
@Override
public Option<Pair<HoodieRecord, Schema>> merge(
HoodieRecord older,
Schema oldSchema,
HoodieRecord newer,
Schema newSchema,
TypedProperties props)
throws IOException {
ValidationUtils.checkArgument(older.getRecordType() == HoodieRecord.HoodieRecordType.SPARK);
ValidationUtils.checkArgument(newer.getRecordType() == HoodieRecord.HoodieRecordType.SPARK);
Object[] returnValues = extractInternalRow(newer, newSchema);
// Now, I want to create an Map<String, Array<Timestamp>>
ArrayBasedMapData mapArray = new ArrayBasedMapData(
ArrayData.toArrayData(new UTF8String[] {UTF8String.fromString("bar")}),
ArrayData.toArrayData(ArrayData.toArrayData(new long[] {103079215152L})));
returnVales[2] = mapArray;
return forgeResult(returnValues, newSchema);
}
/**
UTILS METHODS
*/
private Object[] extractInternalRow(HoodieRecord record, Schema schema) {
String[] fields = schema.getFields().stream().map(f -> f.name()).toArray(String[]::new);
return record.getColumnValues(schema, fields, true);
}
private static Option<Pair<HoodieRecord, Schema>> forgeResult(
Object[] returnValues, Schema newSchema) {
StructType targetSparkSchema =
HoodieInternalRowUtils.getCachedSchema(
newSchema); // use cached schema to avoid useless computations
Row row = RowFactory.create(returnValues);
HoodieRecord mergedRecord = new HoodieSparkRecord(toUnsafeRow(row, targetSparkSchema));
return Option.of(Pair.of(mergedRecord, newSchema));
}
public static UnsafeRow toUnsafeRow(Row row, StructType schema) {
UnsafeProjection proj = UnsafeProjection.create(schema);
return proj.apply((InternalRow) CatalystTypeConverters.convertToCatalyst(row));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment