Skip to content

Instantly share code, notes, and snippets.

@M3lkior
Last active September 28, 2022 14:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save M3lkior/aa4f2b21a46f2d45c84b09b5b0331930 to your computer and use it in GitHub Desktop.
Save M3lkior/aa4f2b21a46f2d45c84b09b5b0331930 to your computer and use it in GitHub Desktop.
Zerocode AVRO support
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.io.Resources;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import org.apache.avro.io.DatumReader;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.jsmart.zerocode.core.di.provider.GsonSerDeProvider;
import org.jsmart.zerocode.core.di.provider.ObjectMapperProvider;
import org.jsmart.zerocode.core.engine.preprocessor.ScenarioExecutionState;
import org.jsmart.zerocode.core.kafka.client.BasicKafkaClient;
import org.jsmart.zerocode.core.kafka.delivery.DeliveryDetails;
import org.jsmart.zerocode.core.kafka.receive.ConsumerCommonConfigs;
import org.jsmart.zerocode.core.kafka.send.message.ProducerJsonRecord;
import org.jsmart.zerocode.core.kafka.send.message.ProducerJsonRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import static org.jsmart.zerocode.core.constants.ZerocodeConstants.FAILED;
import static org.jsmart.zerocode.core.constants.ZerocodeConstants.OK;
import static org.jsmart.zerocode.core.kafka.common.CommonConfigs.BOOTSTRAP_SERVERS;
import static org.jsmart.zerocode.core.kafka.common.KafkaCommonUtils.resolveValuePlaceHolders;
import static org.jsmart.zerocode.core.kafka.helper.KafkaProducerHelper.validateProduceRecord;
/**
* Provides an abstraction to allow using ZeroCode with AVRO format
* which is partially supported by ZeroCode
* (see <a href="https://github.com/authorjapps/zerocode/issues/341">Zerocode issue</a>)
*/
public abstract class AbstractAvroMessagesSupport<K, V extends SpecificRecordBase> extends BasicKafkaClient {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAvroMessagesSupport.class);
// unfortunately not accessible from BasicKafkaClient
protected final ObjectMapper objectMapper = new ObjectMapperProvider().get();
// unfortunately not accessible from BasicKafkaClient
protected final Gson gson = new GsonSerDeProvider().get();
// unfortunately not accessible from BasicKafkaClient
@Inject(optional = true)
@Named("kafka.producer.properties")
protected String producerPropertyFile;
// unfortunately not accessible from BasicKafkaClient
@Inject(optional = true)
@Named("kafka.consumer.properties")
protected String consumerProperyFile;
@Inject
protected ConsumerCommonConfigs consumerCommonConfigs;
private AvroHelper<K, V> avroUtils = new AvroHelper<K, V>();
/**
* @return return a new instance of the {@link K} object in order to allow the {@link DatumReader} deserialization
*/
public abstract K getRecordKeyInstance();
/**
* @return return a new instance of the {@link V} object in order to allow the {@link DatumReader} deserialization
*/
public abstract V getRecordValueInstance();
@Override
public String execute(String brokers, String topicName, String operation, String requestJson, ScenarioExecutionState scenarioExecutionState) {
LOGGER.info("brokers:{}, topicName:{}, operation:{}, requestJson:{}", brokers, topicName, operation, requestJson);
try {
switch (operation.toLowerCase()) {
case "send":
case "load":
case "publish":
case "produce":
// produce Avro message based on JSON description in scenario files
return produceCustomAvroMessage(brokers, topicName, requestJson);
default:
return super.execute(brokers, topicName, operation, requestJson, scenarioExecutionState);
}
} catch (Throwable exx) {
LOGGER.error("Exception during operation:{}, topicName:{}, error:{}", operation, topicName, exx.getMessage());
throw new RuntimeException(exx);
}
}
/**
* Zerocode only support producing Avro messages thanks to the REST API (which not support Messages Headers).
* This method build an Avro messages thanks to the JSON descriptor.
*
* @param brokers
* @param topicName
* @param requestJson
* @return
*/
public String produceCustomAvroMessage(String brokers, String topicName, String requestJson) {
// just create an avro producer, returning your avro java pojo
Producer<K, V> producer = createAvroProducer(brokers, producerPropertyFile);
AtomicReference<String> error = new AtomicReference<>("");
try {
// read the actual zerocode configuration in the json scenario file
ProducerJsonRecords jsonRecords = objectMapper.readValue(requestJson, ProducerJsonRecords.class);
List<ProducerJsonRecord> records = jsonRecords.getRecords();
validateProduceRecord(records);
AtomicReference<RecordMetadata> recordMetadata = new AtomicReference<>();
records.forEach(producerJsonRecord -> {
RecordHeaders headers = buildHeaders(producerJsonRecord);
try {
// construct of the avro record
ProducerRecord<K, V> producerRecord = buildProducerRecord(topicName, producerJsonRecord, headers);
// and send it
RecordMetadata recordMetadataToReturn = sendRecord(producer, producerRecord).get();
// zero code need this metadata
recordMetadata.set(recordMetadataToReturn);
} catch (InterruptedException | ExecutionException | IOException e) {
LOGGER.error("An error occurred during send !", e);
error.set(e.getMessage());
}
});
// dont forget to return a status for tests assertions
return gson.toJson(new DeliveryDetails(OK, recordMetadata.get()));
} catch (IOException e) {
LOGGER.error("Unable to process json.", e);
error.set(e.getMessage());
}
return gson.toJson(new DeliveryDetails(FAILED, StringUtils.isNotBlank(error.get()) ? error.get() : ""));
}
private Future<RecordMetadata> sendRecord(Producer<K, V> producer, ProducerRecord<K, V> producerRecord) {
return producer.send(producerRecord, (metadata, e) -> {
if (metadata != null) {
// Record sent successfully. Exception == null and metadata != null
LOGGER.debug("Message {}-{} successfully sent to topic={} part={} off={} at time={}",
null,
producerRecord,
metadata.topic(),
metadata.partition(),
metadata.offset(),
new Date(metadata.timestamp()));
} else {
// An error occurred. Exception != null and metadata == null
// Correctly handle the exception according to your needs
// /!\ If you don't process the exception, it is "fire-and-forget" like. Send or not or maybe :-)
LOGGER.error("An error occurred during send !", e);
}
});
}
private ProducerRecord<K, V> buildProducerRecord(String topicName, ProducerJsonRecord producerJsonRecord, RecordHeaders headers) throws IOException {
ProducerRecord<K, V> producerRecord =
new ProducerRecord<>(
topicName,
null,
null,
avroUtils.buildRecordKey(getRecordKeyInstance(), producerJsonRecord),
avroUtils.buildRecordValue(getRecordValueInstance(), producerJsonRecord),
headers);
return producerRecord;
}
private RecordHeaders buildHeaders(ProducerJsonRecord producerJsonRecord) {
RecordHeaders headers = new RecordHeaders();
producerJsonRecord.getHeaders().forEach((o, o2) -> headers.add(o.toString(), o2.toString().getBytes(StandardCharsets.UTF_8)));
return headers;
}
private Producer<K, V> createAvroProducer(String bootStrapServers, String producerPropertyFile) {
return new KafkaProducer<>(defineKafkaProperties(bootStrapServers, producerPropertyFile));
}
protected Properties defineKafkaProperties(String bootStrapServers, String producerPropertyFile) {
try (InputStream propsIs = Resources.getResource(producerPropertyFile).openStream()) {
Properties properties = new Properties();
properties.load(propsIs);
properties.put(BOOTSTRAP_SERVERS, bootStrapServers);
resolveValuePlaceHolders(properties);
return properties;
} catch (IOException e) {
throw new RuntimeException("Exception while reading kafka producer properties - " + e);
}
}
}
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificRecordBase;
import org.jsmart.zerocode.core.kafka.send.message.ProducerJsonRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
public class AvroHelper<K, V extends SpecificRecordBase> {
private static final Logger LOGGER = LoggerFactory.getLogger(AvroHelper.class);
/**
* Use {@link DatumReader} to create a custom {@link SpecificRecordBase} based on the {@link ProducerJsonRecord}
* @param clazz the class to deserialize, based on the {@link ProducerJsonRecord}
* @param jsonRecord the {@link ProducerJsonRecord} to deserialize
* @return a {@link SpecificRecordBase} representing the record
* @throws IOException if an error occurs during the deserialization process
*/
public V buildRecordValue(V clazz, ProducerJsonRecord jsonRecord) throws IOException {
DatumReader<V> valueReader
= new SpecificDatumReader(clazz.getClass());
try {
Decoder decoder = new ExtendedJsonDecoder(
clazz.getSchema(), jsonRecord.getValue().toString());
return valueReader.read(null, decoder);
} catch (IOException e) {
LOGGER.error("Deserialization error:" + e.getMessage());
throw e;
}
}
/**
* Use {@link DatumReader} to create a custom {@link SpecificRecordBase} based on the {@link ProducerJsonRecord}
* @param clazz the class to deserialize, based on the {@link ProducerJsonRecord}
* @param jsonRecord the {@link ProducerJsonRecord} to deserialize
* @return a {@link SpecificRecordBase} representing the record key
* @throws IOException if an error occurs during the deserialization process
*/
public K buildRecordKey(K clazz, ProducerJsonRecord jsonRecord) throws IOException {
if (clazz != null && clazz instanceof SpecificRecordBase) {
// use Avro Deserializer to get json record from zerocode configuration and build the AVRO message
DatumReader<K> keyReader
= new SpecificDatumReader(clazz.getClass());
try {
Decoder decoder = DecoderFactory.get().jsonDecoder(
((SpecificRecordBase) clazz).getSchema(), jsonRecord.getJsonKey().toString());
return keyReader.read(null, decoder);
} catch (IOException e) {
LOGGER.error("Deserialization error:" + e.getMessage());
throw e;
}
}
return null;
}
}
/**
* This {@link KafkaAvroCostingMessagesSupport} must be used with the Zerocode {@link org.jsmart.zerocode.core.domain.UseKafkaClient} annotation
* to allow testing payload, received with an AVRO format
*
*/
public class KafkaAvroExampleMessagesSupport extends AbstractAvroMessagesSupport<String, ExamplePayload> {
@Override
public ExamplePayload getRecordValueInstance() {
return new ExamplePayload();
}
@Override
public String getRecordKeyInstance() {
return null;
}
{
"scenarioName":"Example scenario",
"steps":[
{
"name":"produce_step",
"url":"kafka-topic:${SYSTEM.ENV:TOPIC}",
"operation":"produce",
"request":{
"records":[
{
"key":null,
"headers":{
"ANY":"any"
},
"value":{
"name":"Example",
"age":{
"int":25
},
"favoriteProgrammingLanguage":{
"ProgrammingLanguage":"Java"
}
}
}
]
},
"assertions":{
"status":"Ok",
"recordMetadata":{
"topicPartition":{
"topic":"${SYSTEM.ENV:TOPIC}"
}
}
}
}
]
}
{
"name": "ExamplePayload",
"type": "record",
"fields": [
{"name": "name", "type": "string", "example": "Donkey"},
{"name": "age", "type": ["null", "int"], "default": null, "example": 123},
{
"name": "favoriteProgrammingLanguage",
"type": ["null", {"name": "ProgrammingLanguage", "type": "enum", "symbols": ["JS", "Java", "Go", "Rust", "C"], "default": "JS"}]
}
]
}
import KafkaAvroCostingMessagesSupport;
import org.jsmart.zerocode.core.domain.Scenario;
import org.jsmart.zerocode.core.domain.TargetEnv;
import org.jsmart.zerocode.core.domain.UseKafkaClient;
import org.jsmart.zerocode.core.runner.ZeroCodeUnitRunner;
import org.junit.Test;
import org.junit.runner.RunWith;
@TargetEnv("kafka_servers/kafka_server.properties")
@UseKafkaClient(KafkaAvroCostingMessagesSupport.class)
@RunWith(ZeroCodeUnitRunner.class)
public class ExampleScenarioTest {
@Test
@Scenario("kafka/example_scenario.json")
public void test_example_scenario() {
}
}
import com.fasterxml.jackson.core.Base64Variant;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonLocation;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonStreamContext;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.ObjectCodec;
import com.fasterxml.jackson.core.Version;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.NullNode;
import org.apache.avro.AvroTypeException;
import org.apache.avro.JsonProperties;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.ParsingDecoder;
import org.apache.avro.io.parsing.JsonGrammarGenerator;
import org.apache.avro.io.parsing.Parser;
import org.apache.avro.io.parsing.Symbol;
import org.apache.avro.util.Utf8;
import java.io.EOFException;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Stack;
/**
* A {@link Decoder} for Avro's JSON data encoding.
* <p>
* Construct using {@link DecoderFactory}.
* </p>
* ExtendedJsonDecoder is not thread-safe.
* <p>
* Based on {@link org.apache.avro.io.JsonDecoder JsonDecoder}
* and <a href="https://github.com/zolyfarkas/avro">ExtendedJsonDecoder</a>.
* Infers default arguments, if they are not present.
* </p>
*
* <p>This is a code duplication from <a href="https://github.com/Celos/avro-json-decoder">Celos/avro-json-decoder</a></p>
* <p>This ExtendedJsonDecoder allows to create Avro record with the infer of default values due to <a href="https://issues.apache.org/jira/browse/AVRO-1582">this following Jira issue</a></p>
**/
public class ExtendedJsonDecoder extends ParsingDecoder
implements Parser.ActionHandler {
public static final String FIXED = "fixed";
private static final JsonElement NULL_JSON_ELEMENT = new JsonElement(null);
private static final JsonFactory jsonFactory = new JsonFactory();
private final Schema schema;
Stack<ReorderBuffer> reorderBuffers = new Stack<>();
ReorderBuffer currentReorderBuffer;
ObjectMapper mapper = new ObjectMapper();
private JsonParser in;
public ExtendedJsonDecoder(Schema schema, String in) throws IOException {
super(getSymbol(schema));
configure(in);
this.schema = schema;
}
private static Symbol getSymbol(Schema schema) {
if (null == schema) {
throw new NullPointerException("Schema cannot be null!");
}
return new JsonGrammarGenerator().generate(schema);
}
private static List<JsonElement> getVaueAsTree(JsonParser in) throws IOException {
int level = 0;
List<JsonElement> result = new ArrayList<JsonElement>();
do {
JsonToken t = in.getCurrentToken();
switch (t) {
case START_OBJECT:
case START_ARRAY:
level++;
result.add(new JsonElement(t));
break;
case END_OBJECT:
case END_ARRAY:
level--;
result.add(new JsonElement(t));
break;
case FIELD_NAME:
case VALUE_STRING:
case VALUE_NUMBER_INT:
case VALUE_NUMBER_FLOAT:
case VALUE_TRUE:
case VALUE_FALSE:
case VALUE_NULL:
result.add(new JsonElement(t, in.getText()));
break;
default:
result.add(NULL_JSON_ELEMENT);
}
in.nextToken();
} while (level != 0);
result.add(new JsonElement(null));
return result;
}
private static Field findField(Schema schema, String name) {
if (schema.getField(name) != null) {
return schema.getField(name);
}
Field foundField = null;
for (Field field : schema.getFields()) {
Schema fieldSchema = field.schema();
if (Type.RECORD.equals(fieldSchema.getType())) {
foundField = findField(fieldSchema, name);
} else if (Type.ARRAY.equals(fieldSchema.getType())) {
foundField = findField(fieldSchema.getElementType(), name);
} else if (Type.MAP.equals(fieldSchema.getType())) {
foundField = findField(fieldSchema.getValueType(), name);
}
if (foundField != null) {
return foundField;
}
}
return null;
}
/**
* <p>Reconfigures this JsonDecoder to use the String provided for input.</p>
* <p>If the String provided is null, a NullPointerException is thrown.</p>
* Otherwise, this JsonDecoder will reset its state and then
* reconfigure its input.
*
* @param in The String to read from. Cannot be null.
* @return this JsonDecoder
* @throws IOException
*/
public ExtendedJsonDecoder configure(String in) throws IOException {
if (null == in) {
throw new NullPointerException("String to read from cannot be null!");
}
parser.reset();
this.in = new JsonFactory().createJsonParser(in);
this.in.nextToken();
return this;
}
private void advance(Symbol symbol) throws IOException {
this.parser.processTrailingImplicitActions();
if (in.getCurrentToken() == null && this.parser.depth() == 1)
throw new EOFException();
parser.advance(symbol);
}
@Override
public void readNull() throws IOException {
advance(Symbol.NULL);
if (in.getCurrentToken() == JsonToken.VALUE_NULL) {
in.nextToken();
} else {
throw error("null");
}
}
@Override
public boolean readBoolean() throws IOException {
advance(Symbol.BOOLEAN);
JsonToken t = in.getCurrentToken();
if (t == JsonToken.VALUE_TRUE || t == JsonToken.VALUE_FALSE) {
in.nextToken();
return t == JsonToken.VALUE_TRUE;
} else {
throw error("boolean");
}
}
@Override
public int readInt() throws IOException {
advance(Symbol.INT);
if (in.getCurrentToken().isNumeric()) {
int result = in.getIntValue();
in.nextToken();
return result;
} else {
throw error("int");
}
}
@Override
public long readLong() throws IOException {
advance(Symbol.LONG);
if (in.getCurrentToken().isNumeric()) {
long result = in.getLongValue();
in.nextToken();
return result;
} else {
throw error("long");
}
}
@Override
public float readFloat() throws IOException {
advance(Symbol.FLOAT);
if (in.getCurrentToken().isNumeric()) {
float result = in.getFloatValue();
in.nextToken();
return result;
} else {
throw error("float");
}
}
@Override
public double readDouble() throws IOException {
advance(Symbol.DOUBLE);
if (in.getCurrentToken().isNumeric()) {
double result = in.getDoubleValue();
in.nextToken();
return result;
} else {
throw error("double");
}
}
@Override
public Utf8 readString(Utf8 old) throws IOException {
return new Utf8(readString());
}
@Override
public String readString() throws IOException {
advance(Symbol.STRING);
if (parser.topSymbol() == Symbol.MAP_KEY_MARKER) {
parser.advance(Symbol.MAP_KEY_MARKER);
if (in.getCurrentToken() != JsonToken.FIELD_NAME) {
throw error("map-key");
}
} else {
if (in.getCurrentToken() != JsonToken.VALUE_STRING) {
throw error("string");
}
}
String result = in.getText();
in.nextToken();
return result;
}
@Override
public void skipString() throws IOException {
advance(Symbol.STRING);
if (parser.topSymbol() == Symbol.MAP_KEY_MARKER) {
parser.advance(Symbol.MAP_KEY_MARKER);
if (in.getCurrentToken() != JsonToken.FIELD_NAME) {
throw error("map-key");
}
} else {
if (in.getCurrentToken() != JsonToken.VALUE_STRING) {
throw error("string");
}
}
in.nextToken();
}
@Override
public ByteBuffer readBytes(ByteBuffer old) throws IOException {
advance(Symbol.BYTES);
if (in.getCurrentToken() == JsonToken.VALUE_STRING) {
byte[] result = readByteArray();
in.nextToken();
return ByteBuffer.wrap(result);
} else {
throw error("bytes");
}
}
private byte[] readByteArray() throws IOException {
return in.getText().getBytes(StandardCharsets.ISO_8859_1);
}
@Override
public void skipBytes() throws IOException {
advance(Symbol.BYTES);
if (in.getCurrentToken() == JsonToken.VALUE_STRING) {
in.nextToken();
} else {
throw error("bytes");
}
}
private void checkFixed(int size) throws IOException {
advance(Symbol.FIXED);
Symbol.IntCheckAction top = (Symbol.IntCheckAction) parser.popSymbol();
if (size != top.size) {
throw new AvroTypeException(
"Incorrect length for fixed binary: expected " +
top.size + " but received " + size + " bytes.");
}
}
@Override
public void readFixed(byte[] bytes, int start, int len) throws IOException {
checkFixed(len);
if (in.getCurrentToken() == JsonToken.VALUE_STRING) {
byte[] result = readByteArray();
in.nextToken();
if (result.length != len) {
throw new AvroTypeException("Expected fixed length " + len
+ ", but got" + result.length);
}
System.arraycopy(result, 0, bytes, start, len);
} else {
throw error(FIXED);
}
}
@Override
public void skipFixed(int length) throws IOException {
checkFixed(length);
doSkipFixed(length);
}
private void doSkipFixed(int length) throws IOException {
if (in.getCurrentToken() == JsonToken.VALUE_STRING) {
byte[] result = readByteArray();
in.nextToken();
if (result.length != length) {
throw new AvroTypeException("Expected fixed length " + length
+ ", but got" + result.length);
}
} else {
throw error(FIXED);
}
}
@Override
protected void skipFixed() throws IOException {
advance(Symbol.FIXED);
Symbol.IntCheckAction top = (Symbol.IntCheckAction) parser.popSymbol();
doSkipFixed(top.size);
}
@Override
public int readEnum() throws IOException {
advance(Symbol.ENUM);
Symbol.EnumLabelsAction top = (Symbol.EnumLabelsAction) parser.popSymbol();
if (in.getCurrentToken() == JsonToken.VALUE_STRING) {
in.getText();
int n = top.findLabel(in.getText());
if (n >= 0) {
in.nextToken();
return n;
}
throw new AvroTypeException("Unknown symbol in enum " + in.getText());
} else {
throw error(FIXED);
}
}
@Override
public long readArrayStart() throws IOException {
advance(Symbol.ARRAY_START);
if (in.getCurrentToken() == JsonToken.START_ARRAY) {
in.nextToken();
return doArrayNext();
} else {
throw error("array-start");
}
}
@Override
public long arrayNext() throws IOException {
advance(Symbol.ITEM_END);
return doArrayNext();
}
private long doArrayNext() throws IOException {
if (in.getCurrentToken() == JsonToken.END_ARRAY) {
parser.advance(Symbol.ARRAY_END);
in.nextToken();
return 0;
} else {
return 1;
}
}
@Override
public long skipArray() throws IOException {
advance(Symbol.ARRAY_START);
if (in.getCurrentToken() == JsonToken.START_ARRAY) {
in.skipChildren();
in.nextToken();
advance(Symbol.ARRAY_END);
} else {
throw error("array-start");
}
return 0;
}
@Override
public long readMapStart() throws IOException {
advance(Symbol.MAP_START);
if (in.getCurrentToken() == JsonToken.START_OBJECT) {
in.nextToken();
return doMapNext();
} else {
throw error("map-start");
}
}
@Override
public long mapNext() throws IOException {
advance(Symbol.ITEM_END);
return doMapNext();
}
private long doMapNext() throws IOException {
if (in.getCurrentToken() == JsonToken.END_OBJECT) {
in.nextToken();
advance(Symbol.MAP_END);
return 0;
} else {
return 1;
}
}
@Override
public long skipMap() throws IOException {
advance(Symbol.MAP_START);
if (in.getCurrentToken() == JsonToken.START_OBJECT) {
in.skipChildren();
in.nextToken();
advance(Symbol.MAP_END);
} else {
throw error("map-start");
}
return 0;
}
@Override
public int readIndex() throws IOException {
advance(Symbol.UNION);
Symbol.Alternative a = (Symbol.Alternative) parser.popSymbol();
String label;
final JsonToken currentToken = in.getCurrentToken();
if (currentToken == JsonToken.VALUE_NULL) {
label = "null";
} else if (currentToken == JsonToken.START_OBJECT
&& in.nextToken() == JsonToken.FIELD_NAME) {
label = in.getText();
in.nextToken();
parser.pushSymbol(Symbol.UNION_END);
} else if (a.size() == 2 &&
("null".equals(a.getLabel(0)) || "null".equals(a.getLabel(1)))) {
label = ("null".equals(a.getLabel(0)) ? a.getLabel(1) : a.getLabel(0));
} else {
throw error("start-union");
}
int n = a.findLabel(label);
if (n < 0) {
throw new AvroTypeException("Unknown union branch " + label);
}
parser.pushSymbol(a.getSymbol(n));
return n;
}
@Override
public Symbol doAction(Symbol input, Symbol top) throws IOException {
if (top instanceof Symbol.FieldAdjustAction) {
Symbol.FieldAdjustAction fa = (Symbol.FieldAdjustAction) top;
String name = fa.fname;
if (currentReorderBuffer != null) {
List<JsonElement> node = currentReorderBuffer.savedFields.get(name);
if (node != null) {
currentReorderBuffer.savedFields.remove(name);
currentReorderBuffer.origParser = in;
in = makeParser(node);
return null;
}
}
if (in.getCurrentToken() == JsonToken.FIELD_NAME) {
do {
String fn = in.getText();
in.nextToken();
if (name.equals(fn)) {
return null;
} else {
if (currentReorderBuffer == null) {
currentReorderBuffer = new ReorderBuffer();
}
currentReorderBuffer.savedFields.put(fn, getVaueAsTree(in));
}
} while (in.getCurrentToken() == JsonToken.FIELD_NAME);
injectDefaultValueIfAvailable(in, fa.fname);
} else {
injectDefaultValueIfAvailable(in, fa.fname);
}
} else if (top == Symbol.FIELD_END) {
if (currentReorderBuffer != null && currentReorderBuffer.origParser != null) {
in = currentReorderBuffer.origParser;
currentReorderBuffer.origParser = null;
}
} else if (top == Symbol.RECORD_START) {
if (in.getCurrentToken() == JsonToken.START_OBJECT) {
in.nextToken();
reorderBuffers.push(currentReorderBuffer);
currentReorderBuffer = null;
} else {
throw error("record-start");
}
} else if (top == Symbol.RECORD_END || top == Symbol.UNION_END) {
if (in.getCurrentToken() == JsonToken.END_OBJECT) {
in.nextToken();
if (top == Symbol.RECORD_END) {
if (currentReorderBuffer != null && !currentReorderBuffer.savedFields.isEmpty()) {
throw error("Unknown fields: " + currentReorderBuffer.savedFields.keySet());
}
currentReorderBuffer = reorderBuffers.pop();
}
} else {
throw error(top == Symbol.RECORD_END ? "record-end" : "union-end");
}
} else {
throw new AvroTypeException("Unknown action symbol " + top);
}
return null;
}
private JsonParser makeParser(final List<JsonElement> elements) {
return new JsonParser() {
int pos = 0;
@Override
public ObjectCodec getCodec() {
throw new UnsupportedOperationException();
}
@Override
public void setCodec(ObjectCodec c) {
throw new UnsupportedOperationException();
}
@Override
public Version version() {
throw new UnsupportedOperationException();
}
@Override
public void close() {
throw new UnsupportedOperationException();
}
@Override
public JsonToken nextToken() {
pos++;
return elements.get(pos).token;
}
@Override
public JsonToken nextValue() {
return null;
}
@Override
public JsonParser skipChildren() {
JsonToken tkn = elements.get(pos).token;
int level = (tkn == JsonToken.START_ARRAY || tkn == JsonToken.END_ARRAY) ? 1 : 0;
while (level > 0) {
switch (elements.get(++pos).token) {
case START_ARRAY:
case START_OBJECT:
level++;
break;
case END_ARRAY:
case END_OBJECT:
level--;
break;
default:
}
}
return this;
}
@Override
public boolean isClosed() {
throw new UnsupportedOperationException();
}
@Override
public String getCurrentName() {
throw new UnsupportedOperationException();
}
@Override
public JsonStreamContext getParsingContext() {
throw new UnsupportedOperationException();
}
@Override
public JsonLocation getTokenLocation() {
throw new UnsupportedOperationException();
}
@Override
public JsonLocation getCurrentLocation() {
throw new UnsupportedOperationException();
}
@Override
public String getText() {
return elements.get(pos).value;
}
@Override
public char[] getTextCharacters() {
throw new UnsupportedOperationException();
}
@Override
public int getTextLength() {
throw new UnsupportedOperationException();
}
@Override
public int getTextOffset() {
throw new UnsupportedOperationException();
}
@Override
public boolean hasTextCharacters() {
return false;
}
@Override
public Number getNumberValue() {
throw new UnsupportedOperationException();
}
@Override
public NumberType getNumberType() {
throw new UnsupportedOperationException();
}
@Override
public int getIntValue() {
return Integer.parseInt(getText());
}
@Override
public long getLongValue() {
return Long.parseLong(getText());
}
@Override
public BigInteger getBigIntegerValue() {
throw new UnsupportedOperationException();
}
@Override
public float getFloatValue() {
return Float.parseFloat(getText());
}
@Override
public double getDoubleValue() {
return Double.parseDouble(getText());
}
@Override
public BigDecimal getDecimalValue() {
throw new UnsupportedOperationException();
}
@Override
public byte[] getBinaryValue(Base64Variant b64variant) {
throw new UnsupportedOperationException();
}
@Override
public String getValueAsString(String def) {
throw new UnsupportedOperationException();
}
@Override
public JsonToken getCurrentToken() {
return elements.get(pos).token;
}
@Override
public int getCurrentTokenId() {
return getCurrentToken().id();
}
@Override
public boolean hasCurrentToken() {
return getCurrentToken() != null;
}
@Override
public boolean hasTokenId(int id) {
return getCurrentTokenId() == id;
}
@Override
public boolean hasToken(JsonToken t) {
throw new UnsupportedOperationException();
}
@Override
public void clearCurrentToken() {
throw new UnsupportedOperationException();
}
@Override
public JsonToken getLastClearedToken() {
throw new UnsupportedOperationException();
}
@Override
public void overrideCurrentName(String name) {
throw new UnsupportedOperationException();
}
};
}
private AvroTypeException error(String type) {
return new AvroTypeException("Expected " + type +
". Got " + in.getCurrentToken());
}
private void injectDefaultValueIfAvailable(final JsonParser in, String fieldName) throws IOException {
Field field = findField(schema, fieldName);
if (field == null) {
throw new AvroTypeException("Expected field name not found: " + fieldName);
}
if (!field.hasDefaultValue()) {
throw new AvroTypeException("Expected field name not found: " + fieldName);
}
Object defVal = field.defaultVal();
List<JsonElement> result = new ArrayList<>(2);
JsonNode defaultValJsonNode = defVal instanceof JsonProperties.Null ? NullNode.getInstance() : mapper.convertValue(defVal, JsonNode.class);
JsonParser traverse = defaultValJsonNode.traverse();
while (traverse.nextToken() != null) {
final JsonToken currentToken = traverse.getCurrentToken();
if (currentToken.isScalarValue() || (!currentToken.isStructStart() && !currentToken.isStructEnd())) {
result.add(new JsonElement(currentToken, traverse.getText()));
} else {
result.add(new JsonElement(currentToken));
}
}
result.add(NULL_JSON_ELEMENT);
if (currentReorderBuffer == null) {
currentReorderBuffer = new ReorderBuffer();
}
currentReorderBuffer.origParser = in;
this.in = makeParser(result);
}
private static class ReorderBuffer {
public Map<String, List<JsonElement>> savedFields = new HashMap<>();
public JsonParser origParser = null;
}
private static class JsonElement {
public final JsonToken token;
public final String value;
public JsonElement(JsonToken t, String value) {
this.token = t;
this.value = value;
}
public JsonElement(JsonToken t) {
this(t, null);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment