Created
February 15, 2018 04:46
-
-
Save frsyuki/e0a193341e9a8a3a283c7974b9c2cf67 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/ksql-core/pom.xml b/ksql-core/pom.xml | |
index afa072a..0a889b0 100644 | |
--- a/ksql-core/pom.xml | |
+++ b/ksql-core/pom.xml | |
@@ -44,6 +44,11 @@ | |
</dependency> | |
<dependency> | |
+ <groupId>org.msgpack</groupId> | |
+ <artifactId>msgpack-core</artifactId> | |
+ </dependency> | |
+ | |
+ <dependency> | |
<groupId>org.apache.kafka</groupId> | |
<artifactId>kafka_${kafka.scala.version}</artifactId> | |
</dependency> | |
diff --git a/ksql-core/src/main/java/io/confluent/ksql/analyzer/Analyzer.java b/ksql-core/src/main/java/io/confluent/ksql/analyzer/Analyzer.java | |
index 3f0ee56..111450a 100644 | |
--- a/ksql-core/src/main/java/io/confluent/ksql/analyzer/Analyzer.java | |
+++ b/ksql-core/src/main/java/io/confluent/ksql/analyzer/Analyzer.java | |
@@ -50,6 +50,7 @@ import io.confluent.ksql.serde.KsqlTopicSerDe; | |
import io.confluent.ksql.serde.avro.KsqlAvroTopicSerDe; | |
import io.confluent.ksql.serde.delimited.KsqlDelimitedTopicSerDe; | |
import io.confluent.ksql.serde.json.KsqlJsonTopicSerDe; | |
+import io.confluent.ksql.serde.msgpack.KsqlMsgpackTopicSerDe; | |
import io.confluent.ksql.util.KsqlConfig; | |
import io.confluent.ksql.util.KsqlException; | |
import io.confluent.ksql.util.Pair; | |
@@ -132,6 +133,9 @@ public class Analyzer extends DefaultTraversalVisitor<Node, AnalysisContext> { | |
case DataSource.JSON_SERDE_NAME: | |
intoTopicSerde = new KsqlJsonTopicSerDe(null); | |
break; | |
+ case DataSource.MSGPACK_SERDE_NAME: | |
+ intoTopicSerde = new KsqlMsgpackTopicSerDe(null); | |
+ break; | |
case DataSource.DELIMITED_SERDE_NAME: | |
intoTopicSerde = new KsqlDelimitedTopicSerDe(); | |
break; | |
@@ -563,4 +567,4 @@ public class Analyzer extends DefaultTraversalVisitor<Node, AnalysisContext> { | |
} | |
} | |
} | |
-} | |
\ No newline at end of file | |
+} | |
diff --git a/ksql-core/src/main/java/io/confluent/ksql/ddl/commands/RegisterTopicCommand.java b/ksql-core/src/main/java/io/confluent/ksql/ddl/commands/RegisterTopicCommand.java | |
index a4fc530..17eb83a 100644 | |
--- a/ksql-core/src/main/java/io/confluent/ksql/ddl/commands/RegisterTopicCommand.java | |
+++ b/ksql-core/src/main/java/io/confluent/ksql/ddl/commands/RegisterTopicCommand.java | |
@@ -26,6 +26,7 @@ import io.confluent.ksql.serde.KsqlTopicSerDe; | |
import io.confluent.ksql.serde.avro.KsqlAvroTopicSerDe; | |
import io.confluent.ksql.serde.delimited.KsqlDelimitedTopicSerDe; | |
import io.confluent.ksql.serde.json.KsqlJsonTopicSerDe; | |
+import io.confluent.ksql.serde.msgpack.KsqlMsgpackTopicSerDe; | |
import io.confluent.ksql.util.KsqlException; | |
import io.confluent.ksql.util.StringUtil; | |
@@ -78,6 +79,8 @@ public class RegisterTopicCommand implements DDLCommand { | |
return new KsqlAvroTopicSerDe(avroSchema); | |
case DataSource.JSON_SERDE_NAME: | |
return new KsqlJsonTopicSerDe(null); | |
+ case DataSource.MSGPACK_SERDE_NAME: | |
+ return new KsqlMsgpackTopicSerDe(null); | |
case DataSource.DELIMITED_SERDE_NAME: | |
return new KsqlDelimitedTopicSerDe(); | |
default: | |
diff --git a/ksql-core/src/main/java/io/confluent/ksql/metastore/DataSource.java b/ksql-core/src/main/java/io/confluent/ksql/metastore/DataSource.java | |
index c4f97b6..187dc31 100644 | |
--- a/ksql-core/src/main/java/io/confluent/ksql/metastore/DataSource.java | |
+++ b/ksql-core/src/main/java/io/confluent/ksql/metastore/DataSource.java | |
@@ -20,10 +20,11 @@ public interface DataSource { | |
public static enum DataSourceType { KTOPIC, KSTREAM, KTABLE } | |
- public static enum DataSourceSerDe { JSON, AVRO, DELIMITED } | |
+ public static enum DataSourceSerDe { MSGPACK, JSON, AVRO, DELIMITED } | |
public static final String AVRO_SERDE_NAME = "AVRO"; | |
public static final String JSON_SERDE_NAME = "JSON"; | |
+ public static final String MSGPACK_SERDE_NAME = "MSGPACK"; | |
public static final String DELIMITED_SERDE_NAME = "DELIMITED"; | |
public String getName(); | |
diff --git a/ksql-core/src/main/java/io/confluent/ksql/serde/msgpack/FieldKey.java b/ksql-core/src/main/java/io/confluent/ksql/serde/msgpack/FieldKey.java | |
new file mode 100644 | |
index 0000000..2973dfe | |
--- /dev/null | |
+++ b/ksql-core/src/main/java/io/confluent/ksql/serde/msgpack/FieldKey.java | |
@@ -0,0 +1,81 @@ | |
+/** | |
+ * Copyright 2017 Confluent Inc. | |
+ * | |
+ * Licensed under the Apache License, Version 2.0 (the "License"); | |
+ * you may not use this file except in compliance with the License. | |
+ * You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ **/ | |
+ | |
+package io.confluent.ksql.serde.msgpack; | |
+ | |
+import org.msgpack.core.buffer.MessageBuffer; | |
+ | |
+import static java.nio.charset.StandardCharsets.UTF_8; | |
+ | |
+/** | |
+ * This package-private class is used by deserializer to lookup a field | |
+ * without decoding UTF-8. | |
+ */ | |
+class FieldKey { | |
+ public static FieldKey copyOf(String string) { | |
+ byte[] utf8 = string.getBytes(UTF_8); | |
+ return new FieldKey(utf8, 0, utf8.length); | |
+ } | |
+ | |
+ public static FieldKey reference(MessageBuffer ref) { | |
+ return new FieldKey(ref.array(), ref.arrayOffset(), ref.size()); | |
+ } | |
+ | |
+ private static int hashByteArray(byte[] utf8, int offset, int length) { | |
+ int result = 1; | |
+ for (int i = offset; i < offset + length; i++) { | |
+ result = 31 * result + utf8[i]; | |
+ } | |
+ return result; | |
+ } | |
+ | |
+ private final byte[] utf8; | |
+ private final int offset; | |
+ private final int length; | |
+ private final int hashCode; | |
+ | |
+ private FieldKey(byte[] utf8, int offset, int length) { | |
+ this.utf8 = utf8; | |
+ this.offset = offset; | |
+ this.length = length; | |
+ this.hashCode = hashByteArray(utf8, offset, length); | |
+ } | |
+ | |
+ @Override | |
+ public int hashCode() { | |
+ return hashCode; | |
+ } | |
+ | |
+ @Override | |
+ public boolean equals(Object o) { | |
+ if (!(o instanceof FieldKey)) { | |
+ return false; | |
+ } | |
+ return equalTo((FieldKey) o); | |
+ } | |
+ | |
+ private boolean equalTo(FieldKey o) { | |
+ if (length != o.length) { | |
+ return false; | |
+ } | |
+ for (int i = offset + length - 1, j = o.offset + length - 1; i >= offset; i--, j--) { | |
+ if (utf8[i] != o.utf8[j]) { | |
+ return false; | |
+ } | |
+ } | |
+ return true; | |
+ } | |
+} | |
diff --git a/ksql-core/src/main/java/io/confluent/ksql/serde/msgpack/KsqlMsgpackDeserializer.java b/ksql-core/src/main/java/io/confluent/ksql/serde/msgpack/KsqlMsgpackDeserializer.java | |
new file mode 100644 | |
index 0000000..7009607 | |
--- /dev/null | |
+++ b/ksql-core/src/main/java/io/confluent/ksql/serde/msgpack/KsqlMsgpackDeserializer.java | |
@@ -0,0 +1,270 @@ | |
+/** | |
+ * Copyright 2017 Confluent Inc. | |
+ * | |
+ * Licensed under the Apache License, Version 2.0 (the "License"); | |
+ * you may not use this file except in compliance with the License. | |
+ * You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ **/ | |
+ | |
+package io.confluent.ksql.serde.msgpack; | |
+ | |
+import com.fasterxml.jackson.databind.JsonNode; | |
+import com.fasterxml.jackson.databind.ObjectMapper; | |
+import io.confluent.ksql.physical.GenericRow; | |
+import io.confluent.ksql.util.KsqlException; | |
+import org.apache.kafka.common.errors.SerializationException; | |
+import org.apache.kafka.common.serialization.Deserializer; | |
+import org.apache.kafka.connect.data.Field; | |
+import org.apache.kafka.connect.data.Schema; | |
+import org.msgpack.core.MessageFormat; | |
+import org.msgpack.core.MessageInsufficientBufferException; | |
+import org.msgpack.core.MessageIntegerOverflowException; | |
+import org.msgpack.core.MessagePack; | |
+import org.msgpack.core.MessageUnpacker; | |
+import org.msgpack.core.buffer.MessageBuffer; | |
+import org.msgpack.value.IntegerValue; | |
+import org.msgpack.value.Value; | |
+import org.msgpack.value.ValueType; | |
+ | |
+import java.io.IOException; | |
+import java.util.Arrays; | |
+import java.util.HashMap; | |
+import java.util.Iterator; | |
+import java.util.List; | |
+import java.util.Map; | |
+ | |
+import static java.util.Locale.ENGLISH; | |
+import static java.nio.charset.StandardCharsets.UTF_8; | |
+ | |
+public class KsqlMsgpackDeserializer implements Deserializer<GenericRow> { | |
+ | |
+ // Used to cast values to column types | |
+ private ObjectMapper objectMapper = new ObjectMapper(); | |
+ | |
+ private final Map<FieldKey, FieldEntry> fields; | |
+ private final Map<String, FieldEntry> caseInsensitiveFields; | |
+ | |
+ /** | |
+ * Default constructor needed by Kafka | |
+ */ | |
+ public KsqlMsgpackDeserializer(Schema schema) { | |
+ this.fields = new HashMap<>(); | |
+ this.caseInsensitiveFields = new HashMap<>(); | |
+ for (Field field : schema.fields()) { | |
+ String name = field.name().substring(field.name().indexOf('.') + 1); | |
+ FieldEntry entry = new FieldEntry(field.index(), fieldReader(field.schema())); | |
+ fields.put(FieldKey.copyOf(name), entry); | |
+ caseInsensitiveFields.put(name.toUpperCase(ENGLISH), entry); | |
+ } | |
+ } | |
+ | |
+ @Override | |
+ public void configure(Map<String, ?> map, boolean b) { | |
+ } | |
+ | |
+ @Override | |
+ public GenericRow deserialize(final String topic, final byte[] bytes) { | |
+ if (bytes == null) { | |
+ return null; | |
+ } | |
+ | |
+ try (MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(bytes)) { | |
+ return readGenericRow(unpacker); | |
+ } catch (Exception e) { | |
+ throw new SerializationException(e); | |
+ } | |
+ } | |
+ | |
+ private GenericRow readGenericRow(MessageUnpacker unpacker) throws IOException { | |
+ Object[] columns = new Object[fields.size()]; | |
+ | |
+ if (unpacker.getNextFormat().getValueType() == ValueType.MAP) { | |
+ int length = unpacker.unpackMapHeader(); | |
+ for (int i = 0; i < length; i++) { | |
+ MessageBuffer fieldNameRef = readFieldNameRef(unpacker); | |
+ FieldEntry entry = findFieldEntry(fieldNameRef); | |
+ if (entry == null) { | |
+ // Found an unknown field name. Skip its value. | |
+ unpacker.skipValue(); | |
+ } else { | |
+ columns[entry.index] = entry.reader.readFrom(unpacker); | |
+ } | |
+ } | |
+ } else { | |
+ // Stored record is not a map. Leave all columns null. | |
+ } | |
+ | |
+ // TODO exception handlers? | |
+ | |
+ // Some columns might not be filled especially if source bytes doesn't contain some | |
+ // fields. Leave them as null values. | |
+ | |
+ return new GenericRow(Arrays.asList(columns)); | |
+ } | |
+ | |
+ private MessageBuffer readFieldNameRef(MessageUnpacker unpacker) throws IOException { | |
+ if (unpacker.getNextFormat().getValueType() == ValueType.STRING) { | |
+ int length = unpacker.unpackRawStringHeader(); | |
+ return unpacker.readPayloadAsReference(length); | |
+ } else { | |
+ unpacker.skipValue(); | |
+ return null; | |
+ } | |
+ } | |
+ | |
+ private FieldEntry findFieldEntry(MessageBuffer fieldNameRef) { | |
+ if (fieldNameRef == null) { | |
+ return null; | |
+ } | |
+ // Find by case sensitive match first | |
+ FieldEntry entry = fields.get(FieldKey.reference(fieldNameRef)); | |
+ if (entry != null) { | |
+ return entry; | |
+ } | |
+ // If not found, find by case insensitive match | |
+ String fieldName = new String(fieldNameRef.array(), fieldNameRef.arrayOffset(), fieldNameRef.size(), UTF_8); | |
+ return caseInsensitiveFields.get(fieldName.toUpperCase(ENGLISH)); | |
+ } | |
+ | |
+ @Override | |
+ public void close() { | |
+ } | |
+ | |
+ private static class FieldEntry { | |
+ final int index; | |
+ final FieldReader reader; | |
+ | |
+ FieldEntry(int index, FieldReader reader) { | |
+ this.index = index; | |
+ this.reader = reader; | |
+ } | |
+ } | |
+ | |
+ private static interface FieldReader { | |
+ Object readValueFrom(MessageUnpacker unpacker, ValueType nextType) throws IOException; | |
+ | |
+ default Object readFrom(MessageUnpacker unpacker) throws IOException { | |
+ ValueType nextType = unpacker.getNextFormat().getValueType(); | |
+ if (nextType == ValueType.NIL) { | |
+ unpacker.unpackNil(); | |
+ return null; | |
+ } else { | |
+ return readValueFrom(unpacker, nextType); | |
+ } | |
+ } | |
+ } | |
+ | |
+ private FieldReader fieldReader(Schema schema) { | |
+ switch (schema.type()) { | |
+ case BOOLEAN: | |
+ return (unpacker, nextType) -> { | |
+ if (nextType == ValueType.BOOLEAN) { | |
+ return unpacker.unpackBoolean(); | |
+ } else { | |
+ JsonNode node = readAsJsonNode(unpacker); | |
+ System.out.println("boolean : " + nextType + " node: " + node + " node.asBoolean: " + node.asBoolean()); | |
+ return node.asBoolean(); | |
+ //return readAsJsonNode(unpacker).asBoolean(); | |
+ } | |
+ }; | |
+ | |
+ case INT32: | |
+ return (unpacker, nextType) -> { | |
+ if (nextType == ValueType.INTEGER) { | |
+ try { | |
+ return unpacker.unpackInt(); | |
+ } catch (MessageIntegerOverflowException overflow) { | |
+ return overflow.getBigInteger().intValue(); | |
+ } | |
+ } else { | |
+ return readAsJsonNode(unpacker).asInt(); | |
+ } | |
+ }; | |
+ | |
+ case INT64: | |
+ return (unpacker, nextType) -> { | |
+ if (nextType == ValueType.INTEGER) { | |
+ try { | |
+ return unpacker.unpackLong(); | |
+ } catch (MessageIntegerOverflowException overflow) { | |
+ return overflow.getBigInteger().longValue(); | |
+ } | |
+ } else { | |
+ return readAsJsonNode(unpacker).asLong(); | |
+ } | |
+ }; | |
+ | |
+ case FLOAT64: | |
+ return (unpacker, nextType) -> { | |
+ if (nextType == ValueType.FLOAT) { | |
+ return unpacker.unpackDouble(); | |
+ } else { | |
+ return readAsJsonNode(unpacker).asDouble(); | |
+ } | |
+ }; | |
+ | |
+ case STRING: | |
+ return (unpacker, nextType) -> { | |
+ if (nextType == ValueType.STRING) { | |
+ return unpacker.unpackString(); | |
+ } else { | |
+ return unpacker.unpackValue().toJson(); | |
+ } | |
+ }; | |
+ | |
+ case ARRAY: { | |
+ final FieldReader elementReader = fieldReader(schema.valueSchema()); | |
+ return (unpacker, nextType) -> { | |
+ if (nextType == ValueType.ARRAY) { | |
+ int length = unpacker.unpackArrayHeader(); | |
+ Object[] array = new Object[length]; | |
+ for (int i = length; i < length; i++) { | |
+ array[i] = elementReader.readFrom(unpacker); | |
+ } | |
+ return array; | |
+ } else { | |
+ unpacker.skipValue(); | |
+ return null; | |
+ } | |
+ }; | |
+ } | |
+ | |
+ case MAP: { | |
+ final FieldReader keyReader = fieldReader(schema.keySchema()); | |
+ final FieldReader valueReader = fieldReader(schema.valueSchema()); | |
+ return (unpacker, nextType) -> { | |
+ if (nextType == ValueType.MAP) { | |
+ int length = unpacker.unpackMapHeader(); | |
+ Map<Object, Object> map = new HashMap<>(); | |
+ for (int i = length; i < length; i++) { | |
+ Object key = keyReader.readFrom(unpacker); | |
+ Object value = valueReader.readFrom(unpacker); | |
+ if (key != null && value != null) { | |
+ map.put(key, value); | |
+ } | |
+ } | |
+ return map; | |
+ } else { | |
+ unpacker.skipValue(); | |
+ return null; | |
+ } | |
+ }; | |
+ } | |
+ | |
+ default: | |
+ throw new KsqlException("Type is not supported: " + schema.type()); | |
+ } | |
+ } | |
+ | |
+ private JsonNode readAsJsonNode(MessageUnpacker unpacker) throws IOException { | |
+ return objectMapper.readTree(unpacker.unpackValue().toJson()); | |
+ } | |
+} | |
diff --git a/ksql-core/src/main/java/io/confluent/ksql/serde/msgpack/KsqlMsgpackSerializer.java b/ksql-core/src/main/java/io/confluent/ksql/serde/msgpack/KsqlMsgpackSerializer.java | |
new file mode 100644 | |
index 0000000..e8d1448 | |
--- /dev/null | |
+++ b/ksql-core/src/main/java/io/confluent/ksql/serde/msgpack/KsqlMsgpackSerializer.java | |
@@ -0,0 +1,138 @@ | |
+/** | |
+ * Copyright 2017 Confluent Inc. | |
+ * | |
+ * Licensed under the Apache License, Version 2.0 (the "License"); | |
+ * you may not use this file except in compliance with the License. | |
+ * You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ **/ | |
+ | |
+package io.confluent.ksql.serde.msgpack; | |
+ | |
+import io.confluent.ksql.physical.GenericRow; | |
+import io.confluent.ksql.util.KsqlException; | |
+import org.apache.kafka.common.errors.SerializationException; | |
+import org.apache.kafka.common.serialization.Serializer; | |
+import org.apache.kafka.connect.data.Schema; | |
+import org.msgpack.core.MessageBufferPacker; | |
+import org.msgpack.core.MessagePack; | |
+import org.msgpack.core.MessagePacker; | |
+ | |
+import java.io.IOException; | |
+import java.util.Map; | |
+ | |
+public class KsqlMsgpackSerializer implements Serializer<GenericRow> { | |
+ | |
+ private final String[] fieldNames; | |
+ private final FieldWriter[] fieldWriters; | |
+ | |
+ /** | |
+ * Default constructor needed by Kafka | |
+ */ | |
+ public KsqlMsgpackSerializer(Schema schema) { | |
+ this.fieldNames = schema.fields().stream() | |
+ .map(field -> field.name().substring(field.name().indexOf('.') + 1)) | |
+ .toArray(String[]::new); | |
+ this.fieldWriters = schema.fields().stream() | |
+ .map(field -> fieldWriter(field.schema())) | |
+ .toArray(FieldWriter[]::new); | |
+ } | |
+ | |
+ @SuppressWarnings("unchecked") | |
+ @Override | |
+ public void configure(final Map<String, ?> props, final boolean isKey) { | |
+ } | |
+ | |
+ @Override | |
+ public byte[] serialize(final String topic, final GenericRow data) { | |
+ if (data == null) { | |
+ return null; | |
+ } | |
+ | |
+ try (MessageBufferPacker packer = MessagePack.newDefaultBufferPacker()) { | |
+ packer.packMapHeader(fieldNames.length); | |
+ for (int i = 0; i < fieldNames.length; i++) { | |
+ packer.packString(fieldNames[i]); | |
+ fieldWriters[i].writeTo(packer, data.getColumns().get(i)); | |
+ } | |
+ return packer.toByteArray(); | |
+ } catch (IOException e) { | |
+ throw new SerializationException("Error serializing MessagePack message", e); | |
+ } | |
+ } | |
+ | |
+ @Override | |
+ public void close() { | |
+ } | |
+ | |
+ private static interface FieldWriter { | |
+ void writeValueTo(MessagePacker packer, Object value) throws IOException; | |
+ | |
+ default void writeTo(final MessagePacker packer, final Object value) throws IOException { | |
+ if (value == null) { | |
+ packer.packNil(); | |
+ } else { | |
+ writeValueTo(packer, value); | |
+ } | |
+ } | |
+ } | |
+ | |
+ private static FieldWriter fieldWriter(final Schema schema) { | |
+ switch (schema.type()) { | |
+ case BOOLEAN: | |
+ return (packer, value) -> packer.packBoolean((boolean) value); | |
+ case INT8: | |
+ return (packer, value) -> packer.packByte((byte) value); | |
+ case INT16: | |
+ return (packer, value) -> packer.packShort((short) value); | |
+ case INT32: | |
+ return (packer, value) -> packer.packInt((int) value); | |
+ case INT64: | |
+ return (packer, value) -> packer.packLong((long) value); | |
+ case FLOAT32: | |
+ return (packer, value) -> packer.packFloat((float) value); | |
+ case FLOAT64: | |
+ return (packer, value) -> packer.packDouble((double) value); | |
+ case STRING: | |
+ return (packer, value) -> packer.packString((String) value); | |
+ case BYTES: | |
+ return (packer, value) -> { | |
+ byte[] bytes = (byte[]) value; | |
+ packer.packBinaryHeader(bytes.length); | |
+ packer.writePayload(bytes); | |
+ }; | |
+ case ARRAY: { | |
+ final FieldWriter elementWriter = fieldWriter(schema.valueSchema()); | |
+ return (packer, value) -> { | |
+ Object[] array = (Object[]) value; | |
+ packer.packArrayHeader(array.length); | |
+ for (Object element : array) { | |
+ elementWriter.writeTo(packer, element); | |
+ } | |
+ }; | |
+ } | |
+ case MAP: { | |
+ final FieldWriter keyWriter = fieldWriter(schema.keySchema()); | |
+ final FieldWriter valueWriter = fieldWriter(schema.valueSchema()); | |
+ return (packer, value) -> { | |
+ Map<Object, Object> map = (Map<Object, Object>) value; | |
+ packer.packMapHeader(map.size()); | |
+ for (Map.Entry pair : map.entrySet()) { | |
+ keyWriter.writeTo(packer, pair.getKey()); | |
+ valueWriter.writeTo(packer, pair.getValue()); | |
+ } | |
+ }; | |
+ } | |
+ default: | |
+ throw new KsqlException("Type is not supported: " + schema.type()); | |
+ } | |
+ } | |
+ | |
+} | |
diff --git a/ksql-core/src/main/java/io/confluent/ksql/serde/msgpack/KsqlMsgpackTopicSerDe.java b/ksql-core/src/main/java/io/confluent/ksql/serde/msgpack/KsqlMsgpackTopicSerDe.java | |
new file mode 100644 | |
index 0000000..104b183 | |
--- /dev/null | |
+++ b/ksql-core/src/main/java/io/confluent/ksql/serde/msgpack/KsqlMsgpackTopicSerDe.java | |
@@ -0,0 +1,34 @@ | |
+/** | |
+ * Copyright 2017 Confluent Inc. | |
+ * | |
+ * Licensed under the Apache License, Version 2.0 (the "License"); | |
+ * you may not use this file except in compliance with the License. | |
+ * You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ **/ | |
+ | |
+package io.confluent.ksql.serde.msgpack; | |
+ | |
+import io.confluent.ksql.metastore.StructuredDataSource; | |
+import io.confluent.ksql.serde.KsqlTopicSerDe; | |
+import org.apache.kafka.connect.data.Schema; | |
+ | |
+public class KsqlMsgpackTopicSerDe extends KsqlTopicSerDe { | |
+ | |
+ Schema rowSchema; | |
+ public KsqlMsgpackTopicSerDe(Schema rowSchema) { | |
+ super(StructuredDataSource.DataSourceSerDe.MSGPACK); | |
+ this.rowSchema = rowSchema; | |
+ } | |
+ | |
+ public Schema getRowSchema() { | |
+ return rowSchema; | |
+ } | |
+} | |
diff --git a/ksql-core/src/test/java/io/confluent/ksql/serde/msgpack/KsqlMsgpackDeserializerTest.java b/ksql-core/src/test/java/io/confluent/ksql/serde/msgpack/KsqlMsgpackDeserializerTest.java | |
new file mode 100644 | |
index 0000000..fc4d72e | |
--- /dev/null | |
+++ b/ksql-core/src/test/java/io/confluent/ksql/serde/msgpack/KsqlMsgpackDeserializerTest.java | |
@@ -0,0 +1,109 @@ | |
+/** | |
+ * Copyright 2017 Confluent Inc. | |
+ * | |
+ * Licensed under the Apache License, Version 2.0 (the "License"); | |
+ * you may not use this file except in compliance with the License. | |
+ * You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ **/ | |
+ | |
+package io.confluent.ksql.serde.msgpack; | |
+ | |
+import io.confluent.ksql.physical.GenericRow; | |
+import io.confluent.ksql.serde.json.KsqlJsonDeserializer; | |
+import io.confluent.ksql.serde.json.KsqlJsonSerializer; | |
+import io.confluent.ksql.util.KsqlTestUtil; | |
+import java.util.Map; | |
+import java.util.HashMap; | |
+import java.util.List; | |
+import java.util.Objects; | |
+import org.apache.kafka.connect.data.Schema; | |
+import java.io.IOException; | |
+import org.msgpack.core.MessageBufferPacker; | |
+import org.msgpack.value.Value; | |
+import org.msgpack.value.MapValue; | |
+import org.msgpack.value.ValueFactory; | |
+import org.msgpack.core.MessagePack; | |
+import org.msgpack.core.MessagePacker; | |
+import org.apache.kafka.connect.data.SchemaBuilder; | |
+import org.junit.Assert; | |
+import org.junit.Before; | |
+import org.junit.Ignore; | |
+import org.junit.Test; | |
+import static java.util.Arrays.asList; | |
+import static org.apache.kafka.connect.data.SchemaBuilder.BOOLEAN_SCHEMA; | |
+import static org.apache.kafka.connect.data.SchemaBuilder.FLOAT64_SCHEMA; | |
+import static org.apache.kafka.connect.data.SchemaBuilder.INT32_SCHEMA; | |
+import static org.apache.kafka.connect.data.SchemaBuilder.INT64_SCHEMA; | |
+import static org.apache.kafka.connect.data.SchemaBuilder.STRING_SCHEMA; | |
+import static java.nio.charset.StandardCharsets.UTF_8; | |
+ | |
+public class KsqlMsgpackDeserializerTest { | |
+ | |
+ private Schema schema; | |
+ private KsqlMsgpackDeserializer deser; | |
+ private KsqlJsonDeserializer jsonDeser; | |
+ | |
+ private void prepare(Schema schema) { | |
+ this.schema = schema; | |
+ this.deser = new KsqlMsgpackDeserializer(schema); | |
+ this.jsonDeser = new KsqlJsonDeserializer(schema); | |
+ } | |
+ | |
+ private MapValue rowMap(Value... columns) { | |
+ Map<Value, Value> map = new HashMap<>(); | |
+ for (int i = 0; i < schema.fields().size(); i++) { | |
+ String field = schema.fields().get(i).name(); | |
+ map.put(ValueFactory.newString(field.substring(field.indexOf('.') + 1)), columns[i]); | |
+ } | |
+ return ValueFactory.newMap(map); | |
+ } | |
+ | |
+ private List<Object> checkDeserialize(Value... columns) { | |
+ MapValue map = rowMap(columns); | |
+ | |
+ byte[] msgpack; | |
+ try (MessageBufferPacker packer = MessagePack.newDefaultBufferPacker()) { | |
+ packer.packValue(map); | |
+ msgpack = packer.toByteArray(); | |
+ } catch (IOException ex) { | |
+ throw new RuntimeException(ex); | |
+ } | |
+ | |
+ List<Object> objs = deser.deserialize("t", msgpack).getColumns(); | |
+ | |
+ // verify that behavior is same with json | |
+ byte[] json = map.toJson().getBytes(UTF_8); | |
+ List<Object> jsonObjs = jsonDeser.deserialize("t", json).getColumns(); | |
+ Assert.assertEquals(jsonObjs, objs); | |
+ | |
+ return objs; | |
+ } | |
+ | |
+ @Test | |
+ public void testEnforceToBoolean() { | |
+ prepare(SchemaBuilder.struct().field("BOOLEAN", SchemaBuilder.BOOLEAN_SCHEMA).build()); | |
+ Assert.assertEquals(asList(true), checkDeserialize(ValueFactory.newString("true"))); | |
+ Assert.assertEquals(asList(false), checkDeserialize(ValueFactory.newString("false"))); | |
+ Assert.assertEquals(asList(true), checkDeserialize(ValueFactory.newInteger(1))); | |
+ Assert.assertEquals(asList(false), checkDeserialize(ValueFactory.newInteger(0))); | |
+ Assert.assertEquals(asList(false), checkDeserialize(ValueFactory.newString("something"))); | |
+ Assert.assertEquals(asList(false), checkDeserialize(ValueFactory.newFloat(91.32))); | |
+ } | |
+ | |
+ @Test | |
+ public void testEnforceToInt32() { | |
+ prepare(SchemaBuilder.struct().field("INT32", SchemaBuilder.INT32_SCHEMA).build()); | |
+ Assert.assertEquals(asList(1), checkDeserialize(ValueFactory.newString("1"))); | |
+ Assert.assertEquals(asList(2), checkDeserialize(ValueFactory.newString("2"))); | |
+ Assert.assertEquals(asList(-321), checkDeserialize(ValueFactory.newString("-321"))); | |
+ Assert.assertEquals(asList(91), checkDeserialize(ValueFactory.newFloat(91.32))); | |
+ } | |
+} | |
diff --git a/ksql-core/src/test/java/io/confluent/ksql/serde/msgpack/KsqlMsgpackSerializerTest.java b/ksql-core/src/test/java/io/confluent/ksql/serde/msgpack/KsqlMsgpackSerializerTest.java | |
new file mode 100644 | |
index 0000000..0a12dc9 | |
--- /dev/null | |
+++ b/ksql-core/src/test/java/io/confluent/ksql/serde/msgpack/KsqlMsgpackSerializerTest.java | |
@@ -0,0 +1,122 @@ | |
+/** | |
+ * Copyright 2017 Confluent Inc. | |
+ * | |
+ * Licensed under the Apache License, Version 2.0 (the "License"); | |
+ * you may not use this file except in compliance with the License. | |
+ * You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ **/ | |
+ | |
+package io.confluent.ksql.serde.msgpack; | |
+ | |
+import io.confluent.ksql.physical.GenericRow; | |
+import io.confluent.ksql.serde.json.KsqlJsonDeserializer; | |
+import io.confluent.ksql.serde.json.KsqlJsonSerializer; | |
+import java.util.List; | |
+import java.util.Objects; | |
+import org.apache.kafka.connect.data.Schema; | |
+import org.apache.kafka.connect.data.SchemaBuilder; | |
+import org.junit.Assert; | |
+import org.junit.Before; | |
+import org.junit.Ignore; | |
+import org.junit.Test; | |
+import static java.util.Arrays.asList; | |
+ | |
+public class KsqlMsgpackSerializerTest { | |
+ | |
+ private Schema schema; | |
+ private KsqlMsgpackSerializer ser; | |
+ private KsqlMsgpackDeserializer deser; | |
+ private KsqlJsonSerializer jsonSer; | |
+ private KsqlJsonDeserializer jsonDeser; | |
+ | |
+ private void prepare(Schema schema) { | |
+ this.schema = schema; | |
+ this.ser = new KsqlMsgpackSerializer(schema); | |
+ this.deser = new KsqlMsgpackDeserializer(schema); | |
+ this.jsonSer = new KsqlJsonSerializer(schema); | |
+ this.jsonDeser = new KsqlJsonDeserializer(schema); | |
+ } | |
+ | |
+ private List<Object> checkRoundTrip(Object... values) { | |
+ byte[] msgpack = ser.serialize("t", row(values)); | |
+ List<Object> roundTrip = deser.deserialize("t", msgpack).getColumns(); | |
+ | |
+ // verify that behavior is same with json | |
+ byte[] json = jsonSer.serialize("t", row(values)); | |
+ List<Object> jsonRoundTrip = jsonDeser.deserialize("t", json).getColumns(); | |
+ Assert.assertEquals(jsonRoundTrip, roundTrip); | |
+ | |
+ return roundTrip; | |
+ } | |
+ | |
+ private static GenericRow row(Object... values) { | |
+ return new GenericRow(asList(values)); | |
+ } | |
+ | |
+ @Test | |
+ public void testBoolean() { | |
+ prepare(SchemaBuilder.struct().field("BOOLEAN", SchemaBuilder.BOOLEAN_SCHEMA).build()); | |
+ Assert.assertEquals(asList(true), checkRoundTrip(true)); | |
+ Assert.assertEquals(asList(false), checkRoundTrip(false)); | |
+ } | |
+ | |
+ @Test | |
+ public void testInt32() { | |
+ prepare(SchemaBuilder.struct().field("INT32", SchemaBuilder.INT32_SCHEMA).build()); | |
+ Assert.assertEquals(asList(0), checkRoundTrip(0)); | |
+ Assert.assertEquals(asList(1), checkRoundTrip(1)); | |
+ Assert.assertEquals(asList(-321398), checkRoundTrip(-321398)); | |
+ Assert.assertEquals(asList(Integer.MAX_VALUE), checkRoundTrip(Integer.MAX_VALUE)); | |
+ Assert.assertEquals(asList(Integer.MIN_VALUE), checkRoundTrip(Integer.MIN_VALUE)); | |
+ } | |
+ | |
+ @Test | |
+ public void testInt64() { | |
+ prepare(SchemaBuilder.struct().field("INT64", SchemaBuilder.INT64_SCHEMA).build()); | |
+ Assert.assertEquals(asList(0L), checkRoundTrip(0L)); | |
+ Assert.assertEquals(asList(1L), checkRoundTrip(1L)); | |
+ Assert.assertEquals(asList(-321398L), checkRoundTrip(-321398L)); | |
+ Assert.assertEquals(asList((long) Integer.MAX_VALUE), checkRoundTrip((long) Integer.MAX_VALUE)); | |
+ Assert.assertEquals(asList((long) Integer.MIN_VALUE), checkRoundTrip((long) Integer.MIN_VALUE)); | |
+ Assert.assertEquals(asList(Long.MAX_VALUE), checkRoundTrip(Long.MAX_VALUE)); | |
+ Assert.assertEquals(asList(Long.MIN_VALUE), checkRoundTrip(Long.MIN_VALUE)); | |
+ } | |
+ | |
+ @Test | |
+ public void testFloat64() { | |
+ prepare(SchemaBuilder.struct().field("FLOAT64", SchemaBuilder.FLOAT64_SCHEMA).build()); | |
+ Assert.assertEquals(asList(0.1), checkRoundTrip(0.1)); | |
+ Assert.assertEquals(asList(0.3), checkRoundTrip(0.3)); | |
+ Assert.assertEquals(asList(9321.45), checkRoundTrip(9321.45)); | |
+ } | |
+ | |
+ @Test | |
+ public void testString() { | |
+ prepare(SchemaBuilder.struct().field("STRING", SchemaBuilder.STRING_SCHEMA).build()); | |
+ Assert.assertEquals(asList(""), checkRoundTrip("")); | |
+ Assert.assertEquals(asList("a"), checkRoundTrip("a")); | |
+ Assert.assertEquals(asList("\343\201\202"), checkRoundTrip("\343\201\202")); | |
+ Assert.assertEquals(asList("test0123456789KafkaAndMessagePackTestCase"), checkRoundTrip("test0123456789KafkaAndMessagePackTestCase")); | |
+ } | |
+ | |
+ // TODO test array<string> | |
+ // TODO test array<int64> | |
+ // TODO test map<string, string> | |
+ | |
+ //// TODO do this in deser test | |
+ //@Test | |
+ //public void testEnforceToBoolean() { | |
+ // Assert.assertTrue(checkRoundTrip("true").equals(asList(true))); | |
+ // Assert.assertTrue(checkRoundTrip("false").equals(asList(false))); | |
+ // Assert.assertTrue(checkRoundTrip(1).equals(asList(true))); | |
+ // Assert.assertTrue(checkRoundTrip(0).equals(asList(false))); | |
+ //} | |
+} | |
diff --git a/pom.xml b/pom.xml | |
index e37f26b..3327114 100644 | |
--- a/pom.xml | |
+++ b/pom.xml | |
@@ -45,6 +45,7 @@ | |
<jline.version>3.3.1</jline.version> | |
<jna.version>4.4.0</jna.version> | |
<jsr305.version>3.0.2</jsr305.version> | |
+ <msgpack.version>0.8.13</msgpack.version> | |
<really.executable.jar.version>1.5.0</really.executable.jar.version> | |
<licenses.version>3.3.0-SNAPSHOT</licenses.version> | |
<exec-maven-plugin.version>1.2.1</exec-maven-plugin.version> | |
@@ -163,6 +164,12 @@ | |
</dependency> | |
<dependency> | |
+ <groupId>org.msgpack</groupId> | |
+ <artifactId>msgpack-core</artifactId> | |
+ <version>${msgpack.version}</version> | |
+ </dependency> | |
+ | |
+ <dependency> | |
<groupId>org.codehaus.janino</groupId> | |
<artifactId>janino</artifactId> | |
<version>${janino.version}</version> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment