Created
October 14, 2016 08:03
-
-
Save markdav/01623363b5b2508b8e5ef6146caedb1b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import static org.apache.avro.Schema.Type.DOUBLE; | |
import static org.apache.avro.Schema.Type.FLOAT; | |
import static org.apache.avro.Schema.Type.INT; | |
import static org.apache.avro.Schema.Type.LONG; | |
import static org.apache.avro.Schema.Type.NULL; | |
import static org.apache.avro.Schema.Type.STRING; | |
import static org.apache.avro.Schema.Type.UNION; | |
import java.util.Arrays; | |
import java.util.LinkedHashMap; | |
import java.util.List; | |
import java.util.Map; | |
import org.apache.avro.Schema; | |
import org.apache.avro.Schema.Field; | |
import org.apache.avro.Schema.Type; | |
import org.apache.avro.generic.GenericData; | |
import org.apache.avro.generic.GenericRecord; | |
import org.apache.avro.specific.SpecificData; | |
import org.apache.avro.specific.SpecificRecord; | |
public class AutoMapper { | |
private static final GenericData genericData = new GenericData(); | |
public static <T> T decode(GenericRecord genericMessage, Schema schema) { | |
if (genericMessage.getSchema().equals(schema)) { | |
//schemas match up | |
// cheap conversion | |
return (T)SpecificData.get().deepCopy(schema, genericMessage); | |
} else { | |
// Schema has changed..! More expensive version (note: no defaults here.., change to builder when schemas | |
// have defaults) | |
GenericRecord newRecord = new GenericData.Record(schema); | |
autoMap(genericMessage, newRecord); | |
return (T)SpecificData.get().deepCopy(schema, newRecord); | |
} | |
} | |
public static Map<String, Object> autoMap(GenericRecord srcRecord, GenericRecord destRecord, | |
String... destFieldsToExclude) { | |
final List<String> destFieldsToExcludeList = Arrays.asList(destFieldsToExclude); | |
final Schema destRecordSchema = destRecord.getSchema(); | |
final Schema srcRecordSchema = srcRecord.getSchema(); | |
Map<String, Object> unmappedFields = new LinkedHashMap<>(); | |
for (Field field : srcRecordSchema.getFields()) { | |
String fieldName = field.name(); | |
if (destFieldsToExcludeList.contains(fieldName)) { | |
continue; | |
} | |
Field srcField = srcRecordSchema.getField(fieldName); | |
Field destField = destRecordSchema.getField(fieldName); | |
if (destField == null && srcField.aliases().size() > 0) { | |
for (String alias : srcField.aliases()) { | |
destField = destRecordSchema.getField(alias); | |
if (destField != null) { | |
break; | |
} | |
} | |
} | |
Object srcValue = srcRecord.get(srcField.pos()); | |
if (!isImmutable(srcValue)) { | |
srcValue = genericData.deepCopy(srcField.schema(), srcValue); | |
} | |
boolean existsInBoth = (srcField != null && destField != null); | |
if (existsInBoth) { | |
try { | |
if (!srcRecordSchema.equals(destRecordSchema)) { | |
srcValue = getFriendlyValue(srcValue, destField.schema()); | |
} | |
destRecord.put(destField.pos(), srcValue); | |
} catch (Exception e) { | |
throw new RuntimeException(fieldName, e); | |
} | |
} else { | |
unmappedFields.put(fieldName, srcValue); | |
} | |
} | |
return unmappedFields; | |
} | |
static Object getFriendlyValue(Object srcValue, Schema destFieldSchema) { | |
Type destType = getFriendlyType(destFieldSchema); | |
if (INT.equals(destType) && srcValue instanceof Float) { | |
float floatValue = (Float) srcValue; | |
int intValue = ((Float) srcValue).intValue(); | |
if (intValue == floatValue) { | |
srcValue = intValue; | |
} else { | |
throw new RuntimeException( | |
"Cannot convert integer to float because float contained a decimal component."); | |
} | |
} else if (INT.equals(destType) && (srcValue instanceof CharSequence)) { | |
srcValue = Integer.parseInt(srcValue.toString()); | |
} else if (LONG.equals(destType) && srcValue instanceof Integer) { | |
srcValue = ((Integer) srcValue).longValue(); | |
} else if (LONG.equals(destType) && (srcValue instanceof CharSequence)) { | |
srcValue = Long.parseLong(srcValue.toString()); | |
} else if (FLOAT.equals(destType) && srcValue instanceof Integer) { | |
srcValue = ((Integer) srcValue).floatValue(); | |
} else if (FLOAT.equals(destType) && srcValue instanceof Long) { | |
srcValue = ((Long) srcValue).floatValue(); | |
} else if (FLOAT.equals(destType) && (srcValue instanceof CharSequence)) { | |
srcValue = Float.parseFloat(srcValue.toString()); | |
} else if (DOUBLE.equals(destType) && srcValue instanceof Float) { | |
srcValue = ((Float) srcValue).doubleValue(); | |
} else if (STRING.equals(destType) && srcValue != null) { | |
srcValue = srcValue.toString(); | |
} | |
return srcValue; | |
} | |
private static Type getFriendlyType(Schema fieldSchema) { | |
Type type = fieldSchema.getType(); | |
if (UNION.equals(type)) { | |
for (Schema it : fieldSchema.getTypes()) { | |
if (!NULL.equals(it.getType())) { | |
type = it.getType(); | |
break; | |
} | |
} | |
} | |
return type; | |
} | |
private static boolean isImmutable(Object srcValue) { | |
return (srcValue == null || srcValue instanceof Boolean || srcValue instanceof Double | |
|| srcValue instanceof Enum || srcValue instanceof Float || srcValue instanceof Integer | |
|| srcValue instanceof Long || srcValue instanceof String); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Doesn't this
deepCopy
create a huge overhead on the traffic of the system? Doesn't it essentially double the load?