Last active
September 28, 2022 14:20
-
-
Save M3lkior/aa4f2b21a46f2d45c84b09b5b0331930 to your computer and use it in GitHub Desktop.
Zerocode AVRO support
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.fasterxml.jackson.databind.ObjectMapper; | |
import com.google.common.io.Resources; | |
import com.google.gson.Gson; | |
import com.google.inject.Inject; | |
import com.google.inject.name.Named; | |
import org.apache.avro.io.DatumReader; | |
import org.apache.avro.specific.SpecificRecordBase; | |
import org.apache.commons.lang3.StringUtils; | |
import org.apache.kafka.clients.producer.KafkaProducer; | |
import org.apache.kafka.clients.producer.Producer; | |
import org.apache.kafka.clients.producer.ProducerRecord; | |
import org.apache.kafka.clients.producer.RecordMetadata; | |
import org.apache.kafka.common.header.internals.RecordHeaders; | |
import org.jsmart.zerocode.core.di.provider.GsonSerDeProvider; | |
import org.jsmart.zerocode.core.di.provider.ObjectMapperProvider; | |
import org.jsmart.zerocode.core.engine.preprocessor.ScenarioExecutionState; | |
import org.jsmart.zerocode.core.kafka.client.BasicKafkaClient; | |
import org.jsmart.zerocode.core.kafka.delivery.DeliveryDetails; | |
import org.jsmart.zerocode.core.kafka.receive.ConsumerCommonConfigs; | |
import org.jsmart.zerocode.core.kafka.send.message.ProducerJsonRecord; | |
import org.jsmart.zerocode.core.kafka.send.message.ProducerJsonRecords; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.nio.charset.StandardCharsets; | |
import java.util.Date; | |
import java.util.List; | |
import java.util.Properties; | |
import java.util.concurrent.ExecutionException; | |
import java.util.concurrent.Future; | |
import java.util.concurrent.atomic.AtomicReference; | |
import static org.jsmart.zerocode.core.constants.ZerocodeConstants.FAILED; | |
import static org.jsmart.zerocode.core.constants.ZerocodeConstants.OK; | |
import static org.jsmart.zerocode.core.kafka.common.CommonConfigs.BOOTSTRAP_SERVERS; | |
import static org.jsmart.zerocode.core.kafka.common.KafkaCommonUtils.resolveValuePlaceHolders; | |
import static org.jsmart.zerocode.core.kafka.helper.KafkaProducerHelper.validateProduceRecord; | |
/** | |
* Provides an abstraction to allow using ZeroCode with AVRO format | |
* which is partially supported by ZeroCode | |
* (see <a href="https://github.com/authorjapps/zerocode/issues/341">Zerocode issue</a>) | |
*/ | |
public abstract class AbstractAvroMessagesSupport<K, V extends SpecificRecordBase> extends BasicKafkaClient { | |
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAvroMessagesSupport.class); | |
// unfortunately not accessible from BasicKafkaClient | |
protected final ObjectMapper objectMapper = new ObjectMapperProvider().get(); | |
// unfortunately not accessible from BasicKafkaClient | |
protected final Gson gson = new GsonSerDeProvider().get(); | |
// unfortunately not accessible from BasicKafkaClient | |
@Inject(optional = true) | |
@Named("kafka.producer.properties") | |
protected String producerPropertyFile; | |
// unfortunately not accessible from BasicKafkaClient | |
@Inject(optional = true) | |
@Named("kafka.consumer.properties") | |
protected String consumerProperyFile; | |
@Inject | |
protected ConsumerCommonConfigs consumerCommonConfigs; | |
private AvroHelper<K, V> avroUtils = new AvroHelper<K, V>(); | |
/** | |
* @return return a new instance of the {@link K} object in order to allow the {@link DatumReader} deserialization | |
*/ | |
public abstract K getRecordKeyInstance(); | |
/** | |
* @return return a new instance of the {@link V} object in order to allow the {@link DatumReader} deserialization | |
*/ | |
public abstract V getRecordValueInstance(); | |
@Override | |
public String execute(String brokers, String topicName, String operation, String requestJson, ScenarioExecutionState scenarioExecutionState) { | |
LOGGER.info("brokers:{}, topicName:{}, operation:{}, requestJson:{}", brokers, topicName, operation, requestJson); | |
try { | |
switch (operation.toLowerCase()) { | |
case "send": | |
case "load": | |
case "publish": | |
case "produce": | |
// produce Avro message based on JSON description in scenario files | |
return produceCustomAvroMessage(brokers, topicName, requestJson); | |
default: | |
return super.execute(brokers, topicName, operation, requestJson, scenarioExecutionState); | |
} | |
} catch (Throwable exx) { | |
LOGGER.error("Exception during operation:{}, topicName:{}, error:{}", operation, topicName, exx.getMessage()); | |
throw new RuntimeException(exx); | |
} | |
} | |
/** | |
* Zerocode only support producing Avro messages thanks to the REST API (which not support Messages Headers). | |
* This method build an Avro messages thanks to the JSON descriptor. | |
* | |
* @param brokers | |
* @param topicName | |
* @param requestJson | |
* @return | |
*/ | |
public String produceCustomAvroMessage(String brokers, String topicName, String requestJson) { | |
// just create an avro producer, returning your avro java pojo | |
Producer<K, V> producer = createAvroProducer(brokers, producerPropertyFile); | |
AtomicReference<String> error = new AtomicReference<>(""); | |
try { | |
// read the actual zerocode configuration in the json scenario file | |
ProducerJsonRecords jsonRecords = objectMapper.readValue(requestJson, ProducerJsonRecords.class); | |
List<ProducerJsonRecord> records = jsonRecords.getRecords(); | |
validateProduceRecord(records); | |
AtomicReference<RecordMetadata> recordMetadata = new AtomicReference<>(); | |
records.forEach(producerJsonRecord -> { | |
RecordHeaders headers = buildHeaders(producerJsonRecord); | |
try { | |
// construct of the avro record | |
ProducerRecord<K, V> producerRecord = buildProducerRecord(topicName, producerJsonRecord, headers); | |
// and send it | |
RecordMetadata recordMetadataToReturn = sendRecord(producer, producerRecord).get(); | |
// zero code need this metadata | |
recordMetadata.set(recordMetadataToReturn); | |
} catch (InterruptedException | ExecutionException | IOException e) { | |
LOGGER.error("An error occurred during send !", e); | |
error.set(e.getMessage()); | |
} | |
}); | |
// dont forget to return a status for tests assertions | |
return gson.toJson(new DeliveryDetails(OK, recordMetadata.get())); | |
} catch (IOException e) { | |
LOGGER.error("Unable to process json.", e); | |
error.set(e.getMessage()); | |
} | |
return gson.toJson(new DeliveryDetails(FAILED, StringUtils.isNotBlank(error.get()) ? error.get() : "")); | |
} | |
private Future<RecordMetadata> sendRecord(Producer<K, V> producer, ProducerRecord<K, V> producerRecord) { | |
return producer.send(producerRecord, (metadata, e) -> { | |
if (metadata != null) { | |
// Record sent successfully. Exception == null and metadata != null | |
LOGGER.debug("Message {}-{} successfully sent to topic={} part={} off={} at time={}", | |
null, | |
producerRecord, | |
metadata.topic(), | |
metadata.partition(), | |
metadata.offset(), | |
new Date(metadata.timestamp())); | |
} else { | |
// An error occurred. Exception != null and metadata == null | |
// Correctly handle the exception according to your needs | |
// /!\ If you don't process the exception, it is "fire-and-forget" like. Send or not or maybe :-) | |
LOGGER.error("An error occurred during send !", e); | |
} | |
}); | |
} | |
private ProducerRecord<K, V> buildProducerRecord(String topicName, ProducerJsonRecord producerJsonRecord, RecordHeaders headers) throws IOException { | |
ProducerRecord<K, V> producerRecord = | |
new ProducerRecord<>( | |
topicName, | |
null, | |
null, | |
avroUtils.buildRecordKey(getRecordKeyInstance(), producerJsonRecord), | |
avroUtils.buildRecordValue(getRecordValueInstance(), producerJsonRecord), | |
headers); | |
return producerRecord; | |
} | |
private RecordHeaders buildHeaders(ProducerJsonRecord producerJsonRecord) { | |
RecordHeaders headers = new RecordHeaders(); | |
producerJsonRecord.getHeaders().forEach((o, o2) -> headers.add(o.toString(), o2.toString().getBytes(StandardCharsets.UTF_8))); | |
return headers; | |
} | |
private Producer<K, V> createAvroProducer(String bootStrapServers, String producerPropertyFile) { | |
return new KafkaProducer<>(defineKafkaProperties(bootStrapServers, producerPropertyFile)); | |
} | |
protected Properties defineKafkaProperties(String bootStrapServers, String producerPropertyFile) { | |
try (InputStream propsIs = Resources.getResource(producerPropertyFile).openStream()) { | |
Properties properties = new Properties(); | |
properties.load(propsIs); | |
properties.put(BOOTSTRAP_SERVERS, bootStrapServers); | |
resolveValuePlaceHolders(properties); | |
return properties; | |
} catch (IOException e) { | |
throw new RuntimeException("Exception while reading kafka producer properties - " + e); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.avro.io.DatumReader; | |
import org.apache.avro.io.Decoder; | |
import org.apache.avro.io.DecoderFactory; | |
import org.apache.avro.specific.SpecificDatumReader; | |
import org.apache.avro.specific.SpecificRecordBase; | |
import org.jsmart.zerocode.core.kafka.send.message.ProducerJsonRecord; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.io.IOException; | |
public class AvroHelper<K, V extends SpecificRecordBase> { | |
private static final Logger LOGGER = LoggerFactory.getLogger(AvroHelper.class); | |
/** | |
* Use {@link DatumReader} to create a custom {@link SpecificRecordBase} based on the {@link ProducerJsonRecord} | |
* @param clazz the class to deserialize, based on the {@link ProducerJsonRecord} | |
* @param jsonRecord the {@link ProducerJsonRecord} to deserialize | |
* @return a {@link SpecificRecordBase} representing the record | |
* @throws IOException if an error occurs during the deserialization process | |
*/ | |
public V buildRecordValue(V clazz, ProducerJsonRecord jsonRecord) throws IOException { | |
DatumReader<V> valueReader | |
= new SpecificDatumReader(clazz.getClass()); | |
try { | |
Decoder decoder = new ExtendedJsonDecoder( | |
clazz.getSchema(), jsonRecord.getValue().toString()); | |
return valueReader.read(null, decoder); | |
} catch (IOException e) { | |
LOGGER.error("Deserialization error:" + e.getMessage()); | |
throw e; | |
} | |
} | |
/** | |
* Use {@link DatumReader} to create a custom {@link SpecificRecordBase} based on the {@link ProducerJsonRecord} | |
* @param clazz the class to deserialize, based on the {@link ProducerJsonRecord} | |
* @param jsonRecord the {@link ProducerJsonRecord} to deserialize | |
* @return a {@link SpecificRecordBase} representing the record key | |
* @throws IOException if an error occurs during the deserialization process | |
*/ | |
public K buildRecordKey(K clazz, ProducerJsonRecord jsonRecord) throws IOException { | |
if (clazz != null && clazz instanceof SpecificRecordBase) { | |
// use Avro Deserializer to get json record from zerocode configuration and build the AVRO message | |
DatumReader<K> keyReader | |
= new SpecificDatumReader(clazz.getClass()); | |
try { | |
Decoder decoder = DecoderFactory.get().jsonDecoder( | |
((SpecificRecordBase) clazz).getSchema(), jsonRecord.getJsonKey().toString()); | |
return keyReader.read(null, decoder); | |
} catch (IOException e) { | |
LOGGER.error("Deserialization error:" + e.getMessage()); | |
throw e; | |
} | |
} | |
return null; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* This {@link KafkaAvroCostingMessagesSupport} must be used with the Zerocode {@link org.jsmart.zerocode.core.domain.UseKafkaClient} annotation | |
* to allow testing payload, received with an AVRO format | |
* | |
*/ | |
public class KafkaAvroExampleMessagesSupport extends AbstractAvroMessagesSupport<String, ExamplePayload> { | |
@Override | |
public ExamplePayload getRecordValueInstance() { | |
return new ExamplePayload(); | |
} | |
@Override | |
public String getRecordKeyInstance() { | |
return null; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"scenarioName":"Example scenario", | |
"steps":[ | |
{ | |
"name":"produce_step", | |
"url":"kafka-topic:${SYSTEM.ENV:TOPIC}", | |
"operation":"produce", | |
"request":{ | |
"records":[ | |
{ | |
"key":null, | |
"headers":{ | |
"ANY":"any" | |
}, | |
"value":{ | |
"name":"Example", | |
"age":{ | |
"int":25 | |
}, | |
"favoriteProgrammingLanguage":{ | |
"ProgrammingLanguage":"Java" | |
} | |
} | |
} | |
] | |
}, | |
"assertions":{ | |
"status":"Ok", | |
"recordMetadata":{ | |
"topicPartition":{ | |
"topic":"${SYSTEM.ENV:TOPIC}" | |
} | |
} | |
} | |
} | |
] | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"name": "ExamplePayload", | |
"type": "record", | |
"fields": [ | |
{"name": "name", "type": "string", "example": "Donkey"}, | |
{"name": "age", "type": ["null", "int"], "default": null, "example": 123}, | |
{ | |
"name": "favoriteProgrammingLanguage", | |
"type": ["null", {"name": "ProgrammingLanguage", "type": "enum", "symbols": ["JS", "Java", "Go", "Rust", "C"], "default": "JS"}] | |
} | |
] | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import KafkaAvroCostingMessagesSupport; | |
import org.jsmart.zerocode.core.domain.Scenario; | |
import org.jsmart.zerocode.core.domain.TargetEnv; | |
import org.jsmart.zerocode.core.domain.UseKafkaClient; | |
import org.jsmart.zerocode.core.runner.ZeroCodeUnitRunner; | |
import org.junit.Test; | |
import org.junit.runner.RunWith; | |
@TargetEnv("kafka_servers/kafka_server.properties") | |
@UseKafkaClient(KafkaAvroCostingMessagesSupport.class) | |
@RunWith(ZeroCodeUnitRunner.class) | |
public class ExampleScenarioTest { | |
@Test | |
@Scenario("kafka/example_scenario.json") | |
public void test_example_scenario() { | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.fasterxml.jackson.core.Base64Variant; | |
import com.fasterxml.jackson.core.JsonFactory; | |
import com.fasterxml.jackson.core.JsonLocation; | |
import com.fasterxml.jackson.core.JsonParser; | |
import com.fasterxml.jackson.core.JsonStreamContext; | |
import com.fasterxml.jackson.core.JsonToken; | |
import com.fasterxml.jackson.core.ObjectCodec; | |
import com.fasterxml.jackson.core.Version; | |
import com.fasterxml.jackson.databind.JsonNode; | |
import com.fasterxml.jackson.databind.ObjectMapper; | |
import com.fasterxml.jackson.databind.node.NullNode; | |
import org.apache.avro.AvroTypeException; | |
import org.apache.avro.JsonProperties; | |
import org.apache.avro.Schema; | |
import org.apache.avro.Schema.Field; | |
import org.apache.avro.Schema.Type; | |
import org.apache.avro.io.Decoder; | |
import org.apache.avro.io.DecoderFactory; | |
import org.apache.avro.io.ParsingDecoder; | |
import org.apache.avro.io.parsing.JsonGrammarGenerator; | |
import org.apache.avro.io.parsing.Parser; | |
import org.apache.avro.io.parsing.Symbol; | |
import org.apache.avro.util.Utf8; | |
import java.io.EOFException; | |
import java.io.IOException; | |
import java.math.BigDecimal; | |
import java.math.BigInteger; | |
import java.nio.ByteBuffer; | |
import java.nio.charset.StandardCharsets; | |
import java.util.ArrayList; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Stack; | |
/** | |
* A {@link Decoder} for Avro's JSON data encoding. | |
* <p> | |
* Construct using {@link DecoderFactory}. | |
* </p> | |
* ExtendedJsonDecoder is not thread-safe. | |
* <p> | |
* Based on {@link org.apache.avro.io.JsonDecoder JsonDecoder} | |
* and <a href="https://github.com/zolyfarkas/avro">ExtendedJsonDecoder</a>. | |
* Infers default arguments, if they are not present. | |
* </p> | |
* | |
* <p>This is a code duplication from <a href="https://github.com/Celos/avro-json-decoder">Celos/avro-json-decoder</a></p> | |
* <p>This ExtendedJsonDecoder allows to create Avro record with the infer of default values due to <a href="https://issues.apache.org/jira/browse/AVRO-1582">this following Jira issue</a></p> | |
**/ | |
public class ExtendedJsonDecoder extends ParsingDecoder | |
implements Parser.ActionHandler { | |
public static final String FIXED = "fixed"; | |
private static final JsonElement NULL_JSON_ELEMENT = new JsonElement(null); | |
private static final JsonFactory jsonFactory = new JsonFactory(); | |
private final Schema schema; | |
Stack<ReorderBuffer> reorderBuffers = new Stack<>(); | |
ReorderBuffer currentReorderBuffer; | |
ObjectMapper mapper = new ObjectMapper(); | |
private JsonParser in; | |
public ExtendedJsonDecoder(Schema schema, String in) throws IOException { | |
super(getSymbol(schema)); | |
configure(in); | |
this.schema = schema; | |
} | |
private static Symbol getSymbol(Schema schema) { | |
if (null == schema) { | |
throw new NullPointerException("Schema cannot be null!"); | |
} | |
return new JsonGrammarGenerator().generate(schema); | |
} | |
private static List<JsonElement> getVaueAsTree(JsonParser in) throws IOException { | |
int level = 0; | |
List<JsonElement> result = new ArrayList<JsonElement>(); | |
do { | |
JsonToken t = in.getCurrentToken(); | |
switch (t) { | |
case START_OBJECT: | |
case START_ARRAY: | |
level++; | |
result.add(new JsonElement(t)); | |
break; | |
case END_OBJECT: | |
case END_ARRAY: | |
level--; | |
result.add(new JsonElement(t)); | |
break; | |
case FIELD_NAME: | |
case VALUE_STRING: | |
case VALUE_NUMBER_INT: | |
case VALUE_NUMBER_FLOAT: | |
case VALUE_TRUE: | |
case VALUE_FALSE: | |
case VALUE_NULL: | |
result.add(new JsonElement(t, in.getText())); | |
break; | |
default: | |
result.add(NULL_JSON_ELEMENT); | |
} | |
in.nextToken(); | |
} while (level != 0); | |
result.add(new JsonElement(null)); | |
return result; | |
} | |
private static Field findField(Schema schema, String name) { | |
if (schema.getField(name) != null) { | |
return schema.getField(name); | |
} | |
Field foundField = null; | |
for (Field field : schema.getFields()) { | |
Schema fieldSchema = field.schema(); | |
if (Type.RECORD.equals(fieldSchema.getType())) { | |
foundField = findField(fieldSchema, name); | |
} else if (Type.ARRAY.equals(fieldSchema.getType())) { | |
foundField = findField(fieldSchema.getElementType(), name); | |
} else if (Type.MAP.equals(fieldSchema.getType())) { | |
foundField = findField(fieldSchema.getValueType(), name); | |
} | |
if (foundField != null) { | |
return foundField; | |
} | |
} | |
return null; | |
} | |
/** | |
* <p>Reconfigures this JsonDecoder to use the String provided for input.</p> | |
* <p>If the String provided is null, a NullPointerException is thrown.</p> | |
* Otherwise, this JsonDecoder will reset its state and then | |
* reconfigure its input. | |
* | |
* @param in The String to read from. Cannot be null. | |
* @return this JsonDecoder | |
* @throws IOException | |
*/ | |
public ExtendedJsonDecoder configure(String in) throws IOException { | |
if (null == in) { | |
throw new NullPointerException("String to read from cannot be null!"); | |
} | |
parser.reset(); | |
this.in = new JsonFactory().createJsonParser(in); | |
this.in.nextToken(); | |
return this; | |
} | |
private void advance(Symbol symbol) throws IOException { | |
this.parser.processTrailingImplicitActions(); | |
if (in.getCurrentToken() == null && this.parser.depth() == 1) | |
throw new EOFException(); | |
parser.advance(symbol); | |
} | |
@Override | |
public void readNull() throws IOException { | |
advance(Symbol.NULL); | |
if (in.getCurrentToken() == JsonToken.VALUE_NULL) { | |
in.nextToken(); | |
} else { | |
throw error("null"); | |
} | |
} | |
@Override | |
public boolean readBoolean() throws IOException { | |
advance(Symbol.BOOLEAN); | |
JsonToken t = in.getCurrentToken(); | |
if (t == JsonToken.VALUE_TRUE || t == JsonToken.VALUE_FALSE) { | |
in.nextToken(); | |
return t == JsonToken.VALUE_TRUE; | |
} else { | |
throw error("boolean"); | |
} | |
} | |
@Override | |
public int readInt() throws IOException { | |
advance(Symbol.INT); | |
if (in.getCurrentToken().isNumeric()) { | |
int result = in.getIntValue(); | |
in.nextToken(); | |
return result; | |
} else { | |
throw error("int"); | |
} | |
} | |
@Override | |
public long readLong() throws IOException { | |
advance(Symbol.LONG); | |
if (in.getCurrentToken().isNumeric()) { | |
long result = in.getLongValue(); | |
in.nextToken(); | |
return result; | |
} else { | |
throw error("long"); | |
} | |
} | |
@Override | |
public float readFloat() throws IOException { | |
advance(Symbol.FLOAT); | |
if (in.getCurrentToken().isNumeric()) { | |
float result = in.getFloatValue(); | |
in.nextToken(); | |
return result; | |
} else { | |
throw error("float"); | |
} | |
} | |
@Override | |
public double readDouble() throws IOException { | |
advance(Symbol.DOUBLE); | |
if (in.getCurrentToken().isNumeric()) { | |
double result = in.getDoubleValue(); | |
in.nextToken(); | |
return result; | |
} else { | |
throw error("double"); | |
} | |
} | |
@Override | |
public Utf8 readString(Utf8 old) throws IOException { | |
return new Utf8(readString()); | |
} | |
@Override | |
public String readString() throws IOException { | |
advance(Symbol.STRING); | |
if (parser.topSymbol() == Symbol.MAP_KEY_MARKER) { | |
parser.advance(Symbol.MAP_KEY_MARKER); | |
if (in.getCurrentToken() != JsonToken.FIELD_NAME) { | |
throw error("map-key"); | |
} | |
} else { | |
if (in.getCurrentToken() != JsonToken.VALUE_STRING) { | |
throw error("string"); | |
} | |
} | |
String result = in.getText(); | |
in.nextToken(); | |
return result; | |
} | |
@Override | |
public void skipString() throws IOException { | |
advance(Symbol.STRING); | |
if (parser.topSymbol() == Symbol.MAP_KEY_MARKER) { | |
parser.advance(Symbol.MAP_KEY_MARKER); | |
if (in.getCurrentToken() != JsonToken.FIELD_NAME) { | |
throw error("map-key"); | |
} | |
} else { | |
if (in.getCurrentToken() != JsonToken.VALUE_STRING) { | |
throw error("string"); | |
} | |
} | |
in.nextToken(); | |
} | |
@Override | |
public ByteBuffer readBytes(ByteBuffer old) throws IOException { | |
advance(Symbol.BYTES); | |
if (in.getCurrentToken() == JsonToken.VALUE_STRING) { | |
byte[] result = readByteArray(); | |
in.nextToken(); | |
return ByteBuffer.wrap(result); | |
} else { | |
throw error("bytes"); | |
} | |
} | |
private byte[] readByteArray() throws IOException { | |
return in.getText().getBytes(StandardCharsets.ISO_8859_1); | |
} | |
@Override | |
public void skipBytes() throws IOException { | |
advance(Symbol.BYTES); | |
if (in.getCurrentToken() == JsonToken.VALUE_STRING) { | |
in.nextToken(); | |
} else { | |
throw error("bytes"); | |
} | |
} | |
private void checkFixed(int size) throws IOException { | |
advance(Symbol.FIXED); | |
Symbol.IntCheckAction top = (Symbol.IntCheckAction) parser.popSymbol(); | |
if (size != top.size) { | |
throw new AvroTypeException( | |
"Incorrect length for fixed binary: expected " + | |
top.size + " but received " + size + " bytes."); | |
} | |
} | |
@Override | |
public void readFixed(byte[] bytes, int start, int len) throws IOException { | |
checkFixed(len); | |
if (in.getCurrentToken() == JsonToken.VALUE_STRING) { | |
byte[] result = readByteArray(); | |
in.nextToken(); | |
if (result.length != len) { | |
throw new AvroTypeException("Expected fixed length " + len | |
+ ", but got" + result.length); | |
} | |
System.arraycopy(result, 0, bytes, start, len); | |
} else { | |
throw error(FIXED); | |
} | |
} | |
@Override | |
public void skipFixed(int length) throws IOException { | |
checkFixed(length); | |
doSkipFixed(length); | |
} | |
private void doSkipFixed(int length) throws IOException { | |
if (in.getCurrentToken() == JsonToken.VALUE_STRING) { | |
byte[] result = readByteArray(); | |
in.nextToken(); | |
if (result.length != length) { | |
throw new AvroTypeException("Expected fixed length " + length | |
+ ", but got" + result.length); | |
} | |
} else { | |
throw error(FIXED); | |
} | |
} | |
@Override | |
protected void skipFixed() throws IOException { | |
advance(Symbol.FIXED); | |
Symbol.IntCheckAction top = (Symbol.IntCheckAction) parser.popSymbol(); | |
doSkipFixed(top.size); | |
} | |
@Override | |
public int readEnum() throws IOException { | |
advance(Symbol.ENUM); | |
Symbol.EnumLabelsAction top = (Symbol.EnumLabelsAction) parser.popSymbol(); | |
if (in.getCurrentToken() == JsonToken.VALUE_STRING) { | |
in.getText(); | |
int n = top.findLabel(in.getText()); | |
if (n >= 0) { | |
in.nextToken(); | |
return n; | |
} | |
throw new AvroTypeException("Unknown symbol in enum " + in.getText()); | |
} else { | |
throw error(FIXED); | |
} | |
} | |
@Override | |
public long readArrayStart() throws IOException { | |
advance(Symbol.ARRAY_START); | |
if (in.getCurrentToken() == JsonToken.START_ARRAY) { | |
in.nextToken(); | |
return doArrayNext(); | |
} else { | |
throw error("array-start"); | |
} | |
} | |
@Override | |
public long arrayNext() throws IOException { | |
advance(Symbol.ITEM_END); | |
return doArrayNext(); | |
} | |
private long doArrayNext() throws IOException { | |
if (in.getCurrentToken() == JsonToken.END_ARRAY) { | |
parser.advance(Symbol.ARRAY_END); | |
in.nextToken(); | |
return 0; | |
} else { | |
return 1; | |
} | |
} | |
@Override | |
public long skipArray() throws IOException { | |
advance(Symbol.ARRAY_START); | |
if (in.getCurrentToken() == JsonToken.START_ARRAY) { | |
in.skipChildren(); | |
in.nextToken(); | |
advance(Symbol.ARRAY_END); | |
} else { | |
throw error("array-start"); | |
} | |
return 0; | |
} | |
@Override | |
public long readMapStart() throws IOException { | |
advance(Symbol.MAP_START); | |
if (in.getCurrentToken() == JsonToken.START_OBJECT) { | |
in.nextToken(); | |
return doMapNext(); | |
} else { | |
throw error("map-start"); | |
} | |
} | |
@Override | |
public long mapNext() throws IOException { | |
advance(Symbol.ITEM_END); | |
return doMapNext(); | |
} | |
private long doMapNext() throws IOException { | |
if (in.getCurrentToken() == JsonToken.END_OBJECT) { | |
in.nextToken(); | |
advance(Symbol.MAP_END); | |
return 0; | |
} else { | |
return 1; | |
} | |
} | |
@Override | |
public long skipMap() throws IOException { | |
advance(Symbol.MAP_START); | |
if (in.getCurrentToken() == JsonToken.START_OBJECT) { | |
in.skipChildren(); | |
in.nextToken(); | |
advance(Symbol.MAP_END); | |
} else { | |
throw error("map-start"); | |
} | |
return 0; | |
} | |
@Override | |
public int readIndex() throws IOException { | |
advance(Symbol.UNION); | |
Symbol.Alternative a = (Symbol.Alternative) parser.popSymbol(); | |
String label; | |
final JsonToken currentToken = in.getCurrentToken(); | |
if (currentToken == JsonToken.VALUE_NULL) { | |
label = "null"; | |
} else if (currentToken == JsonToken.START_OBJECT | |
&& in.nextToken() == JsonToken.FIELD_NAME) { | |
label = in.getText(); | |
in.nextToken(); | |
parser.pushSymbol(Symbol.UNION_END); | |
} else if (a.size() == 2 && | |
("null".equals(a.getLabel(0)) || "null".equals(a.getLabel(1)))) { | |
label = ("null".equals(a.getLabel(0)) ? a.getLabel(1) : a.getLabel(0)); | |
} else { | |
throw error("start-union"); | |
} | |
int n = a.findLabel(label); | |
if (n < 0) { | |
throw new AvroTypeException("Unknown union branch " + label); | |
} | |
parser.pushSymbol(a.getSymbol(n)); | |
return n; | |
} | |
@Override | |
public Symbol doAction(Symbol input, Symbol top) throws IOException { | |
if (top instanceof Symbol.FieldAdjustAction) { | |
Symbol.FieldAdjustAction fa = (Symbol.FieldAdjustAction) top; | |
String name = fa.fname; | |
if (currentReorderBuffer != null) { | |
List<JsonElement> node = currentReorderBuffer.savedFields.get(name); | |
if (node != null) { | |
currentReorderBuffer.savedFields.remove(name); | |
currentReorderBuffer.origParser = in; | |
in = makeParser(node); | |
return null; | |
} | |
} | |
if (in.getCurrentToken() == JsonToken.FIELD_NAME) { | |
do { | |
String fn = in.getText(); | |
in.nextToken(); | |
if (name.equals(fn)) { | |
return null; | |
} else { | |
if (currentReorderBuffer == null) { | |
currentReorderBuffer = new ReorderBuffer(); | |
} | |
currentReorderBuffer.savedFields.put(fn, getVaueAsTree(in)); | |
} | |
} while (in.getCurrentToken() == JsonToken.FIELD_NAME); | |
injectDefaultValueIfAvailable(in, fa.fname); | |
} else { | |
injectDefaultValueIfAvailable(in, fa.fname); | |
} | |
} else if (top == Symbol.FIELD_END) { | |
if (currentReorderBuffer != null && currentReorderBuffer.origParser != null) { | |
in = currentReorderBuffer.origParser; | |
currentReorderBuffer.origParser = null; | |
} | |
} else if (top == Symbol.RECORD_START) { | |
if (in.getCurrentToken() == JsonToken.START_OBJECT) { | |
in.nextToken(); | |
reorderBuffers.push(currentReorderBuffer); | |
currentReorderBuffer = null; | |
} else { | |
throw error("record-start"); | |
} | |
} else if (top == Symbol.RECORD_END || top == Symbol.UNION_END) { | |
if (in.getCurrentToken() == JsonToken.END_OBJECT) { | |
in.nextToken(); | |
if (top == Symbol.RECORD_END) { | |
if (currentReorderBuffer != null && !currentReorderBuffer.savedFields.isEmpty()) { | |
throw error("Unknown fields: " + currentReorderBuffer.savedFields.keySet()); | |
} | |
currentReorderBuffer = reorderBuffers.pop(); | |
} | |
} else { | |
throw error(top == Symbol.RECORD_END ? "record-end" : "union-end"); | |
} | |
} else { | |
throw new AvroTypeException("Unknown action symbol " + top); | |
} | |
return null; | |
} | |
private JsonParser makeParser(final List<JsonElement> elements) { | |
return new JsonParser() { | |
int pos = 0; | |
@Override | |
public ObjectCodec getCodec() { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public void setCodec(ObjectCodec c) { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public Version version() { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public void close() { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public JsonToken nextToken() { | |
pos++; | |
return elements.get(pos).token; | |
} | |
@Override | |
public JsonToken nextValue() { | |
return null; | |
} | |
@Override | |
public JsonParser skipChildren() { | |
JsonToken tkn = elements.get(pos).token; | |
int level = (tkn == JsonToken.START_ARRAY || tkn == JsonToken.END_ARRAY) ? 1 : 0; | |
while (level > 0) { | |
switch (elements.get(++pos).token) { | |
case START_ARRAY: | |
case START_OBJECT: | |
level++; | |
break; | |
case END_ARRAY: | |
case END_OBJECT: | |
level--; | |
break; | |
default: | |
} | |
} | |
return this; | |
} | |
@Override | |
public boolean isClosed() { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public String getCurrentName() { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public JsonStreamContext getParsingContext() { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public JsonLocation getTokenLocation() { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public JsonLocation getCurrentLocation() { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public String getText() { | |
return elements.get(pos).value; | |
} | |
@Override | |
public char[] getTextCharacters() { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public int getTextLength() { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public int getTextOffset() { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public boolean hasTextCharacters() { | |
return false; | |
} | |
@Override | |
public Number getNumberValue() { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public NumberType getNumberType() { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public int getIntValue() { | |
return Integer.parseInt(getText()); | |
} | |
@Override | |
public long getLongValue() { | |
return Long.parseLong(getText()); | |
} | |
@Override | |
public BigInteger getBigIntegerValue() { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public float getFloatValue() { | |
return Float.parseFloat(getText()); | |
} | |
@Override | |
public double getDoubleValue() { | |
return Double.parseDouble(getText()); | |
} | |
@Override | |
public BigDecimal getDecimalValue() { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public byte[] getBinaryValue(Base64Variant b64variant) { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public String getValueAsString(String def) { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public JsonToken getCurrentToken() { | |
return elements.get(pos).token; | |
} | |
@Override | |
public int getCurrentTokenId() { | |
return getCurrentToken().id(); | |
} | |
@Override | |
public boolean hasCurrentToken() { | |
return getCurrentToken() != null; | |
} | |
@Override | |
public boolean hasTokenId(int id) { | |
return getCurrentTokenId() == id; | |
} | |
@Override | |
public boolean hasToken(JsonToken t) { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public void clearCurrentToken() { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public JsonToken getLastClearedToken() { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public void overrideCurrentName(String name) { | |
throw new UnsupportedOperationException(); | |
} | |
}; | |
} | |
private AvroTypeException error(String type) { | |
return new AvroTypeException("Expected " + type + | |
". Got " + in.getCurrentToken()); | |
} | |
private void injectDefaultValueIfAvailable(final JsonParser in, String fieldName) throws IOException { | |
Field field = findField(schema, fieldName); | |
if (field == null) { | |
throw new AvroTypeException("Expected field name not found: " + fieldName); | |
} | |
if (!field.hasDefaultValue()) { | |
throw new AvroTypeException("Expected field name not found: " + fieldName); | |
} | |
Object defVal = field.defaultVal(); | |
List<JsonElement> result = new ArrayList<>(2); | |
JsonNode defaultValJsonNode = defVal instanceof JsonProperties.Null ? NullNode.getInstance() : mapper.convertValue(defVal, JsonNode.class); | |
JsonParser traverse = defaultValJsonNode.traverse(); | |
while (traverse.nextToken() != null) { | |
final JsonToken currentToken = traverse.getCurrentToken(); | |
if (currentToken.isScalarValue() || (!currentToken.isStructStart() && !currentToken.isStructEnd())) { | |
result.add(new JsonElement(currentToken, traverse.getText())); | |
} else { | |
result.add(new JsonElement(currentToken)); | |
} | |
} | |
result.add(NULL_JSON_ELEMENT); | |
if (currentReorderBuffer == null) { | |
currentReorderBuffer = new ReorderBuffer(); | |
} | |
currentReorderBuffer.origParser = in; | |
this.in = makeParser(result); | |
} | |
private static class ReorderBuffer { | |
public Map<String, List<JsonElement>> savedFields = new HashMap<>(); | |
public JsonParser origParser = null; | |
} | |
private static class JsonElement { | |
public final JsonToken token; | |
public final String value; | |
public JsonElement(JsonToken t, String value) { | |
this.token = t; | |
this.value = value; | |
} | |
public JsonElement(JsonToken t) { | |
this(t, null); | |
} | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment