Skip to content

Instantly share code, notes, and snippets.

@markdav
Created October 14, 2016 08:03
Show Gist options
  • Save markdav/01623363b5b2508b8e5ef6146caedb1b to your computer and use it in GitHub Desktop.
Save markdav/01623363b5b2508b8e5ef6146caedb1b to your computer and use it in GitHub Desktop.
import static org.apache.avro.Schema.Type.DOUBLE;
import static org.apache.avro.Schema.Type.FLOAT;
import static org.apache.avro.Schema.Type.INT;
import static org.apache.avro.Schema.Type.LONG;
import static org.apache.avro.Schema.Type.NULL;
import static org.apache.avro.Schema.Type.STRING;
import static org.apache.avro.Schema.Type.UNION;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificRecord;
public class AutoMapper {
private static final GenericData genericData = new GenericData();
public static <T> T decode(GenericRecord genericMessage, Schema schema) {
if (genericMessage.getSchema().equals(schema)) {
//schemas match up
// cheap conversion
return (T)SpecificData.get().deepCopy(schema, genericMessage);
} else {
// Schema has changed..! More expensive version (note: no defaults here.., change to builder when schemas
// have defaults)
GenericRecord newRecord = new GenericData.Record(schema);
autoMap(genericMessage, newRecord);
return (T)SpecificData.get().deepCopy(schema, newRecord);
}
}
public static Map<String, Object> autoMap(GenericRecord srcRecord, GenericRecord destRecord,
String... destFieldsToExclude) {
final List<String> destFieldsToExcludeList = Arrays.asList(destFieldsToExclude);
final Schema destRecordSchema = destRecord.getSchema();
final Schema srcRecordSchema = srcRecord.getSchema();
Map<String, Object> unmappedFields = new LinkedHashMap<>();
for (Field field : srcRecordSchema.getFields()) {
String fieldName = field.name();
if (destFieldsToExcludeList.contains(fieldName)) {
continue;
}
Field srcField = srcRecordSchema.getField(fieldName);
Field destField = destRecordSchema.getField(fieldName);
if (destField == null && srcField.aliases().size() > 0) {
for (String alias : srcField.aliases()) {
destField = destRecordSchema.getField(alias);
if (destField != null) {
break;
}
}
}
Object srcValue = srcRecord.get(srcField.pos());
if (!isImmutable(srcValue)) {
srcValue = genericData.deepCopy(srcField.schema(), srcValue);
}
boolean existsInBoth = (srcField != null && destField != null);
if (existsInBoth) {
try {
if (!srcRecordSchema.equals(destRecordSchema)) {
srcValue = getFriendlyValue(srcValue, destField.schema());
}
destRecord.put(destField.pos(), srcValue);
} catch (Exception e) {
throw new RuntimeException(fieldName, e);
}
} else {
unmappedFields.put(fieldName, srcValue);
}
}
return unmappedFields;
}
static Object getFriendlyValue(Object srcValue, Schema destFieldSchema) {
Type destType = getFriendlyType(destFieldSchema);
if (INT.equals(destType) && srcValue instanceof Float) {
float floatValue = (Float) srcValue;
int intValue = ((Float) srcValue).intValue();
if (intValue == floatValue) {
srcValue = intValue;
} else {
throw new RuntimeException(
"Cannot convert integer to float because float contained a decimal component.");
}
} else if (INT.equals(destType) && (srcValue instanceof CharSequence)) {
srcValue = Integer.parseInt(srcValue.toString());
} else if (LONG.equals(destType) && srcValue instanceof Integer) {
srcValue = ((Integer) srcValue).longValue();
} else if (LONG.equals(destType) && (srcValue instanceof CharSequence)) {
srcValue = Long.parseLong(srcValue.toString());
} else if (FLOAT.equals(destType) && srcValue instanceof Integer) {
srcValue = ((Integer) srcValue).floatValue();
} else if (FLOAT.equals(destType) && srcValue instanceof Long) {
srcValue = ((Long) srcValue).floatValue();
} else if (FLOAT.equals(destType) && (srcValue instanceof CharSequence)) {
srcValue = Float.parseFloat(srcValue.toString());
} else if (DOUBLE.equals(destType) && srcValue instanceof Float) {
srcValue = ((Float) srcValue).doubleValue();
} else if (STRING.equals(destType) && srcValue != null) {
srcValue = srcValue.toString();
}
return srcValue;
}
private static Type getFriendlyType(Schema fieldSchema) {
Type type = fieldSchema.getType();
if (UNION.equals(type)) {
for (Schema it : fieldSchema.getTypes()) {
if (!NULL.equals(it.getType())) {
type = it.getType();
break;
}
}
}
return type;
}
private static boolean isImmutable(Object srcValue) {
return (srcValue == null || srcValue instanceof Boolean || srcValue instanceof Double
|| srcValue instanceof Enum || srcValue instanceof Float || srcValue instanceof Integer
|| srcValue instanceof Long || srcValue instanceof String);
}
}
@mkw
Copy link

mkw commented Sep 28, 2017

This looks great -- would you give it a license, please?

@nikoplusx
Copy link

Doesn't this deepCopy create a huge overhead on the traffic of the system? Doesn't it essentially double the load?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment