Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
diff --git a/ksql-core/pom.xml b/ksql-core/pom.xml
index afa072a..0a889b0 100644
--- a/ksql-core/pom.xml
+++ b/ksql-core/pom.xml
@@ -44,6 +44,11 @@
</dependency>
<dependency>
+ <groupId>org.msgpack</groupId>
+ <artifactId>msgpack-core</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${kafka.scala.version}</artifactId>
</dependency>
diff --git a/ksql-core/src/main/java/io/confluent/ksql/analyzer/Analyzer.java b/ksql-core/src/main/java/io/confluent/ksql/analyzer/Analyzer.java
index 3f0ee56..111450a 100644
--- a/ksql-core/src/main/java/io/confluent/ksql/analyzer/Analyzer.java
+++ b/ksql-core/src/main/java/io/confluent/ksql/analyzer/Analyzer.java
@@ -50,6 +50,7 @@ import io.confluent.ksql.serde.KsqlTopicSerDe;
import io.confluent.ksql.serde.avro.KsqlAvroTopicSerDe;
import io.confluent.ksql.serde.delimited.KsqlDelimitedTopicSerDe;
import io.confluent.ksql.serde.json.KsqlJsonTopicSerDe;
+import io.confluent.ksql.serde.msgpack.KsqlMsgpackTopicSerDe;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.Pair;
@@ -132,6 +133,9 @@ public class Analyzer extends DefaultTraversalVisitor<Node, AnalysisContext> {
case DataSource.JSON_SERDE_NAME:
intoTopicSerde = new KsqlJsonTopicSerDe(null);
break;
+ case DataSource.MSGPACK_SERDE_NAME:
+ intoTopicSerde = new KsqlMsgpackTopicSerDe(null);
+ break;
case DataSource.DELIMITED_SERDE_NAME:
intoTopicSerde = new KsqlDelimitedTopicSerDe();
break;
@@ -563,4 +567,4 @@ public class Analyzer extends DefaultTraversalVisitor<Node, AnalysisContext> {
}
}
}
-}
\ No newline at end of file
+}
diff --git a/ksql-core/src/main/java/io/confluent/ksql/ddl/commands/RegisterTopicCommand.java b/ksql-core/src/main/java/io/confluent/ksql/ddl/commands/RegisterTopicCommand.java
index a4fc530..17eb83a 100644
--- a/ksql-core/src/main/java/io/confluent/ksql/ddl/commands/RegisterTopicCommand.java
+++ b/ksql-core/src/main/java/io/confluent/ksql/ddl/commands/RegisterTopicCommand.java
@@ -26,6 +26,7 @@ import io.confluent.ksql.serde.KsqlTopicSerDe;
import io.confluent.ksql.serde.avro.KsqlAvroTopicSerDe;
import io.confluent.ksql.serde.delimited.KsqlDelimitedTopicSerDe;
import io.confluent.ksql.serde.json.KsqlJsonTopicSerDe;
+import io.confluent.ksql.serde.msgpack.KsqlMsgpackTopicSerDe;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.StringUtil;
@@ -78,6 +79,8 @@ public class RegisterTopicCommand implements DDLCommand {
return new KsqlAvroTopicSerDe(avroSchema);
case DataSource.JSON_SERDE_NAME:
return new KsqlJsonTopicSerDe(null);
+ case DataSource.MSGPACK_SERDE_NAME:
+ return new KsqlMsgpackTopicSerDe(null);
case DataSource.DELIMITED_SERDE_NAME:
return new KsqlDelimitedTopicSerDe();
default:
diff --git a/ksql-core/src/main/java/io/confluent/ksql/metastore/DataSource.java b/ksql-core/src/main/java/io/confluent/ksql/metastore/DataSource.java
index c4f97b6..187dc31 100644
--- a/ksql-core/src/main/java/io/confluent/ksql/metastore/DataSource.java
+++ b/ksql-core/src/main/java/io/confluent/ksql/metastore/DataSource.java
@@ -20,10 +20,11 @@ public interface DataSource {
public static enum DataSourceType { KTOPIC, KSTREAM, KTABLE }
- public static enum DataSourceSerDe { JSON, AVRO, DELIMITED }
+ public static enum DataSourceSerDe { MSGPACK, JSON, AVRO, DELIMITED }
public static final String AVRO_SERDE_NAME = "AVRO";
public static final String JSON_SERDE_NAME = "JSON";
+ public static final String MSGPACK_SERDE_NAME = "MSGPACK";
public static final String DELIMITED_SERDE_NAME = "DELIMITED";
public String getName();
diff --git a/ksql-core/src/main/java/io/confluent/ksql/serde/msgpack/FieldKey.java b/ksql-core/src/main/java/io/confluent/ksql/serde/msgpack/FieldKey.java
new file mode 100644
index 0000000..2973dfe
--- /dev/null
+++ b/ksql-core/src/main/java/io/confluent/ksql/serde/msgpack/FieldKey.java
@@ -0,0 +1,81 @@
+/**
+ * Copyright 2017 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package io.confluent.ksql.serde.msgpack;
+
+import org.msgpack.core.buffer.MessageBuffer;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/**
+ * This package-private class is used by deserializer to lookup a field
+ * without decoding UTF-8.
+ */
+class FieldKey {
+ public static FieldKey copyOf(String string) {
+ byte[] utf8 = string.getBytes(UTF_8);
+ return new FieldKey(utf8, 0, utf8.length);
+ }
+
+ public static FieldKey reference(MessageBuffer ref) {
+ return new FieldKey(ref.array(), ref.arrayOffset(), ref.size());
+ }
+
+ private static int hashByteArray(byte[] utf8, int offset, int length) {
+ int result = 1;
+ for (int i = offset; i < offset + length; i++) {
+ result = 31 * result + utf8[i];
+ }
+ return result;
+ }
+
+ private final byte[] utf8;
+ private final int offset;
+ private final int length;
+ private final int hashCode;
+
+ private FieldKey(byte[] utf8, int offset, int length) {
+ this.utf8 = utf8;
+ this.offset = offset;
+ this.length = length;
+ this.hashCode = hashByteArray(utf8, offset, length);
+ }
+
+ @Override
+ public int hashCode() {
+ return hashCode;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof FieldKey)) {
+ return false;
+ }
+ return equalTo((FieldKey) o);
+ }
+
+ private boolean equalTo(FieldKey o) {
+ if (length != o.length) {
+ return false;
+ }
+ for (int i = offset + length - 1, j = o.offset + length - 1; i >= offset; i--, j--) {
+ if (utf8[i] != o.utf8[j]) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
diff --git a/ksql-core/src/main/java/io/confluent/ksql/serde/msgpack/KsqlMsgpackDeserializer.java b/ksql-core/src/main/java/io/confluent/ksql/serde/msgpack/KsqlMsgpackDeserializer.java
new file mode 100644
index 0000000..7009607
--- /dev/null
+++ b/ksql-core/src/main/java/io/confluent/ksql/serde/msgpack/KsqlMsgpackDeserializer.java
@@ -0,0 +1,270 @@
+/**
+ * Copyright 2017 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package io.confluent.ksql.serde.msgpack;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.confluent.ksql.physical.GenericRow;
+import io.confluent.ksql.util.KsqlException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.msgpack.core.MessageFormat;
+import org.msgpack.core.MessageInsufficientBufferException;
+import org.msgpack.core.MessageIntegerOverflowException;
+import org.msgpack.core.MessagePack;
+import org.msgpack.core.MessageUnpacker;
+import org.msgpack.core.buffer.MessageBuffer;
+import org.msgpack.value.IntegerValue;
+import org.msgpack.value.Value;
+import org.msgpack.value.ValueType;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Locale.ENGLISH;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class KsqlMsgpackDeserializer implements Deserializer<GenericRow> {
+
+ // Used to cast values to column types
+ private ObjectMapper objectMapper = new ObjectMapper();
+
+ private final Map<FieldKey, FieldEntry> fields;
+ private final Map<String, FieldEntry> caseInsensitiveFields;
+
+ /**
+ * Default constructor needed by Kafka
+ */
+ public KsqlMsgpackDeserializer(Schema schema) {
+ this.fields = new HashMap<>();
+ this.caseInsensitiveFields = new HashMap<>();
+ for (Field field : schema.fields()) {
+ String name = field.name().substring(field.name().indexOf('.') + 1);
+ FieldEntry entry = new FieldEntry(field.index(), fieldReader(field.schema()));
+ fields.put(FieldKey.copyOf(name), entry);
+ caseInsensitiveFields.put(name.toUpperCase(ENGLISH), entry);
+ }
+ }
+
+ @Override
+ public void configure(Map<String, ?> map, boolean b) {
+ }
+
+ @Override
+ public GenericRow deserialize(final String topic, final byte[] bytes) {
+ if (bytes == null) {
+ return null;
+ }
+
+ try (MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(bytes)) {
+ return readGenericRow(unpacker);
+ } catch (Exception e) {
+ throw new SerializationException(e);
+ }
+ }
+
+ private GenericRow readGenericRow(MessageUnpacker unpacker) throws IOException {
+ Object[] columns = new Object[fields.size()];
+
+ if (unpacker.getNextFormat().getValueType() == ValueType.MAP) {
+ int length = unpacker.unpackMapHeader();
+ for (int i = 0; i < length; i++) {
+ MessageBuffer fieldNameRef = readFieldNameRef(unpacker);
+ FieldEntry entry = findFieldEntry(fieldNameRef);
+ if (entry == null) {
+ // Found an unknown field name. Skip its value.
+ unpacker.skipValue();
+ } else {
+ columns[entry.index] = entry.reader.readFrom(unpacker);
+ }
+ }
+ } else {
+ // Stored record is not a map. Leave all columns null.
+ }
+
+ // TODO exception handlers?
+
+ // Some columns might not be filled especially if source bytes doesn't contain some
+ // fields. Leave them as null values.
+
+ return new GenericRow(Arrays.asList(columns));
+ }
+
+ private MessageBuffer readFieldNameRef(MessageUnpacker unpacker) throws IOException {
+ if (unpacker.getNextFormat().getValueType() == ValueType.STRING) {
+ int length = unpacker.unpackRawStringHeader();
+ return unpacker.readPayloadAsReference(length);
+ } else {
+ unpacker.skipValue();
+ return null;
+ }
+ }
+
+ private FieldEntry findFieldEntry(MessageBuffer fieldNameRef) {
+ if (fieldNameRef == null) {
+ return null;
+ }
+ // Find by case sensitive match first
+ FieldEntry entry = fields.get(FieldKey.reference(fieldNameRef));
+ if (entry != null) {
+ return entry;
+ }
+ // If not found, find by case insensitive match
+ String fieldName = new String(fieldNameRef.array(), fieldNameRef.arrayOffset(), fieldNameRef.size(), UTF_8);
+ return caseInsensitiveFields.get(fieldName.toUpperCase(ENGLISH));
+ }
+
+ @Override
+ public void close() {
+ }
+
+ private static class FieldEntry {
+ final int index;
+ final FieldReader reader;
+
+ FieldEntry(int index, FieldReader reader) {
+ this.index = index;
+ this.reader = reader;
+ }
+ }
+
+ private static interface FieldReader {
+ Object readValueFrom(MessageUnpacker unpacker, ValueType nextType) throws IOException;
+
+ default Object readFrom(MessageUnpacker unpacker) throws IOException {
+ ValueType nextType = unpacker.getNextFormat().getValueType();
+ if (nextType == ValueType.NIL) {
+ unpacker.unpackNil();
+ return null;
+ } else {
+ return readValueFrom(unpacker, nextType);
+ }
+ }
+ }
+
+ private FieldReader fieldReader(Schema schema) {
+ switch (schema.type()) {
+ case BOOLEAN:
+ return (unpacker, nextType) -> {
+ if (nextType == ValueType.BOOLEAN) {
+ return unpacker.unpackBoolean();
+ } else {
+ JsonNode node = readAsJsonNode(unpacker);
+ System.out.println("boolean : " + nextType + " node: " + node + " node.asBoolean: " + node.asBoolean());
+ return node.asBoolean();
+ //return readAsJsonNode(unpacker).asBoolean();
+ }
+ };
+
+ case INT32:
+ return (unpacker, nextType) -> {
+ if (nextType == ValueType.INTEGER) {
+ try {
+ return unpacker.unpackInt();
+ } catch (MessageIntegerOverflowException overflow) {
+ return overflow.getBigInteger().intValue();
+ }
+ } else {
+ return readAsJsonNode(unpacker).asInt();
+ }
+ };
+
+ case INT64:
+ return (unpacker, nextType) -> {
+ if (nextType == ValueType.INTEGER) {
+ try {
+ return unpacker.unpackLong();
+ } catch (MessageIntegerOverflowException overflow) {
+ return overflow.getBigInteger().longValue();
+ }
+ } else {
+ return readAsJsonNode(unpacker).asLong();
+ }
+ };
+
+ case FLOAT64:
+ return (unpacker, nextType) -> {
+ if (nextType == ValueType.FLOAT) {
+ return unpacker.unpackDouble();
+ } else {
+ return readAsJsonNode(unpacker).asDouble();
+ }
+ };
+
+ case STRING:
+ return (unpacker, nextType) -> {
+ if (nextType == ValueType.STRING) {
+ return unpacker.unpackString();
+ } else {
+ return unpacker.unpackValue().toJson();
+ }
+ };
+
+ case ARRAY: {
+ final FieldReader elementReader = fieldReader(schema.valueSchema());
+ return (unpacker, nextType) -> {
+ if (nextType == ValueType.ARRAY) {
+ int length = unpacker.unpackArrayHeader();
+ Object[] array = new Object[length];
+ for (int i = length; i < length; i++) {
+ array[i] = elementReader.readFrom(unpacker);
+ }
+ return array;
+ } else {
+ unpacker.skipValue();
+ return null;
+ }
+ };
+ }
+
+ case MAP: {
+ final FieldReader keyReader = fieldReader(schema.keySchema());
+ final FieldReader valueReader = fieldReader(schema.valueSchema());
+ return (unpacker, nextType) -> {
+ if (nextType == ValueType.MAP) {
+ int length = unpacker.unpackMapHeader();
+ Map<Object, Object> map = new HashMap<>();
+ for (int i = length; i < length; i++) {
+ Object key = keyReader.readFrom(unpacker);
+ Object value = valueReader.readFrom(unpacker);
+ if (key != null && value != null) {
+ map.put(key, value);
+ }
+ }
+ return map;
+ } else {
+ unpacker.skipValue();
+ return null;
+ }
+ };
+ }
+
+ default:
+ throw new KsqlException("Type is not supported: " + schema.type());
+ }
+ }
+
+ private JsonNode readAsJsonNode(MessageUnpacker unpacker) throws IOException {
+ return objectMapper.readTree(unpacker.unpackValue().toJson());
+ }
+}
diff --git a/ksql-core/src/main/java/io/confluent/ksql/serde/msgpack/KsqlMsgpackSerializer.java b/ksql-core/src/main/java/io/confluent/ksql/serde/msgpack/KsqlMsgpackSerializer.java
new file mode 100644
index 0000000..e8d1448
--- /dev/null
+++ b/ksql-core/src/main/java/io/confluent/ksql/serde/msgpack/KsqlMsgpackSerializer.java
@@ -0,0 +1,138 @@
+/**
+ * Copyright 2017 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package io.confluent.ksql.serde.msgpack;
+
+import io.confluent.ksql.physical.GenericRow;
+import io.confluent.ksql.util.KsqlException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.connect.data.Schema;
+import org.msgpack.core.MessageBufferPacker;
+import org.msgpack.core.MessagePack;
+import org.msgpack.core.MessagePacker;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class KsqlMsgpackSerializer implements Serializer<GenericRow> {
+
+ private final String[] fieldNames;
+ private final FieldWriter[] fieldWriters;
+
+ /**
+ * Default constructor needed by Kafka
+ */
+ public KsqlMsgpackSerializer(Schema schema) {
+ this.fieldNames = schema.fields().stream()
+ .map(field -> field.name().substring(field.name().indexOf('.') + 1))
+ .toArray(String[]::new);
+ this.fieldWriters = schema.fields().stream()
+ .map(field -> fieldWriter(field.schema()))
+ .toArray(FieldWriter[]::new);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void configure(final Map<String, ?> props, final boolean isKey) {
+ }
+
+ @Override
+ public byte[] serialize(final String topic, final GenericRow data) {
+ if (data == null) {
+ return null;
+ }
+
+ try (MessageBufferPacker packer = MessagePack.newDefaultBufferPacker()) {
+ packer.packMapHeader(fieldNames.length);
+ for (int i = 0; i < fieldNames.length; i++) {
+ packer.packString(fieldNames[i]);
+ fieldWriters[i].writeTo(packer, data.getColumns().get(i));
+ }
+ return packer.toByteArray();
+ } catch (IOException e) {
+ throw new SerializationException("Error serializing MessagePack message", e);
+ }
+ }
+
+ @Override
+ public void close() {
+ }
+
+ private static interface FieldWriter {
+ void writeValueTo(MessagePacker packer, Object value) throws IOException;
+
+ default void writeTo(final MessagePacker packer, final Object value) throws IOException {
+ if (value == null) {
+ packer.packNil();
+ } else {
+ writeValueTo(packer, value);
+ }
+ }
+ }
+
+ private static FieldWriter fieldWriter(final Schema schema) {
+ switch (schema.type()) {
+ case BOOLEAN:
+ return (packer, value) -> packer.packBoolean((boolean) value);
+ case INT8:
+ return (packer, value) -> packer.packByte((byte) value);
+ case INT16:
+ return (packer, value) -> packer.packShort((short) value);
+ case INT32:
+ return (packer, value) -> packer.packInt((int) value);
+ case INT64:
+ return (packer, value) -> packer.packLong((long) value);
+ case FLOAT32:
+ return (packer, value) -> packer.packFloat((float) value);
+ case FLOAT64:
+ return (packer, value) -> packer.packDouble((double) value);
+ case STRING:
+ return (packer, value) -> packer.packString((String) value);
+ case BYTES:
+ return (packer, value) -> {
+ byte[] bytes = (byte[]) value;
+ packer.packBinaryHeader(bytes.length);
+ packer.writePayload(bytes);
+ };
+ case ARRAY: {
+ final FieldWriter elementWriter = fieldWriter(schema.valueSchema());
+ return (packer, value) -> {
+ Object[] array = (Object[]) value;
+ packer.packArrayHeader(array.length);
+ for (Object element : array) {
+ elementWriter.writeTo(packer, element);
+ }
+ };
+ }
+ case MAP: {
+ final FieldWriter keyWriter = fieldWriter(schema.keySchema());
+ final FieldWriter valueWriter = fieldWriter(schema.valueSchema());
+ return (packer, value) -> {
+ Map<Object, Object> map = (Map<Object, Object>) value;
+ packer.packMapHeader(map.size());
+ for (Map.Entry pair : map.entrySet()) {
+ keyWriter.writeTo(packer, pair.getKey());
+ valueWriter.writeTo(packer, pair.getValue());
+ }
+ };
+ }
+ default:
+ throw new KsqlException("Type is not supported: " + schema.type());
+ }
+ }
+
+}
diff --git a/ksql-core/src/main/java/io/confluent/ksql/serde/msgpack/KsqlMsgpackTopicSerDe.java b/ksql-core/src/main/java/io/confluent/ksql/serde/msgpack/KsqlMsgpackTopicSerDe.java
new file mode 100644
index 0000000..104b183
--- /dev/null
+++ b/ksql-core/src/main/java/io/confluent/ksql/serde/msgpack/KsqlMsgpackTopicSerDe.java
@@ -0,0 +1,34 @@
+/**
+ * Copyright 2017 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package io.confluent.ksql.serde.msgpack;
+
+import io.confluent.ksql.metastore.StructuredDataSource;
+import io.confluent.ksql.serde.KsqlTopicSerDe;
+import org.apache.kafka.connect.data.Schema;
+
+public class KsqlMsgpackTopicSerDe extends KsqlTopicSerDe {
+
+ Schema rowSchema;
+ public KsqlMsgpackTopicSerDe(Schema rowSchema) {
+ super(StructuredDataSource.DataSourceSerDe.MSGPACK);
+ this.rowSchema = rowSchema;
+ }
+
+ public Schema getRowSchema() {
+ return rowSchema;
+ }
+}
diff --git a/ksql-core/src/test/java/io/confluent/ksql/serde/msgpack/KsqlMsgpackDeserializerTest.java b/ksql-core/src/test/java/io/confluent/ksql/serde/msgpack/KsqlMsgpackDeserializerTest.java
new file mode 100644
index 0000000..fc4d72e
--- /dev/null
+++ b/ksql-core/src/test/java/io/confluent/ksql/serde/msgpack/KsqlMsgpackDeserializerTest.java
@@ -0,0 +1,109 @@
+/**
+ * Copyright 2017 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package io.confluent.ksql.serde.msgpack;
+
+import io.confluent.ksql.physical.GenericRow;
+import io.confluent.ksql.serde.json.KsqlJsonDeserializer;
+import io.confluent.ksql.serde.json.KsqlJsonSerializer;
+import io.confluent.ksql.util.KsqlTestUtil;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Objects;
+import org.apache.kafka.connect.data.Schema;
+import java.io.IOException;
+import org.msgpack.core.MessageBufferPacker;
+import org.msgpack.value.Value;
+import org.msgpack.value.MapValue;
+import org.msgpack.value.ValueFactory;
+import org.msgpack.core.MessagePack;
+import org.msgpack.core.MessagePacker;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import static java.util.Arrays.asList;
+import static org.apache.kafka.connect.data.SchemaBuilder.BOOLEAN_SCHEMA;
+import static org.apache.kafka.connect.data.SchemaBuilder.FLOAT64_SCHEMA;
+import static org.apache.kafka.connect.data.SchemaBuilder.INT32_SCHEMA;
+import static org.apache.kafka.connect.data.SchemaBuilder.INT64_SCHEMA;
+import static org.apache.kafka.connect.data.SchemaBuilder.STRING_SCHEMA;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class KsqlMsgpackDeserializerTest {
+
+ private Schema schema;
+ private KsqlMsgpackDeserializer deser;
+ private KsqlJsonDeserializer jsonDeser;
+
+ private void prepare(Schema schema) {
+ this.schema = schema;
+ this.deser = new KsqlMsgpackDeserializer(schema);
+ this.jsonDeser = new KsqlJsonDeserializer(schema);
+ }
+
+ private MapValue rowMap(Value... columns) {
+ Map<Value, Value> map = new HashMap<>();
+ for (int i = 0; i < schema.fields().size(); i++) {
+ String field = schema.fields().get(i).name();
+ map.put(ValueFactory.newString(field.substring(field.indexOf('.') + 1)), columns[i]);
+ }
+ return ValueFactory.newMap(map);
+ }
+
+ private List<Object> checkDeserialize(Value... columns) {
+ MapValue map = rowMap(columns);
+
+ byte[] msgpack;
+ try (MessageBufferPacker packer = MessagePack.newDefaultBufferPacker()) {
+ packer.packValue(map);
+ msgpack = packer.toByteArray();
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+
+ List<Object> objs = deser.deserialize("t", msgpack).getColumns();
+
+ // verify that behavior is same with json
+ byte[] json = map.toJson().getBytes(UTF_8);
+ List<Object> jsonObjs = jsonDeser.deserialize("t", json).getColumns();
+ Assert.assertEquals(jsonObjs, objs);
+
+ return objs;
+ }
+
+ @Test
+ public void testEnforceToBoolean() {
+ prepare(SchemaBuilder.struct().field("BOOLEAN", SchemaBuilder.BOOLEAN_SCHEMA).build());
+ Assert.assertEquals(asList(true), checkDeserialize(ValueFactory.newString("true")));
+ Assert.assertEquals(asList(false), checkDeserialize(ValueFactory.newString("false")));
+ Assert.assertEquals(asList(true), checkDeserialize(ValueFactory.newInteger(1)));
+ Assert.assertEquals(asList(false), checkDeserialize(ValueFactory.newInteger(0)));
+ Assert.assertEquals(asList(false), checkDeserialize(ValueFactory.newString("something")));
+ Assert.assertEquals(asList(false), checkDeserialize(ValueFactory.newFloat(91.32)));
+ }
+
+ @Test
+ public void testEnforceToInt32() {
+ prepare(SchemaBuilder.struct().field("INT32", SchemaBuilder.INT32_SCHEMA).build());
+ Assert.assertEquals(asList(1), checkDeserialize(ValueFactory.newString("1")));
+ Assert.assertEquals(asList(2), checkDeserialize(ValueFactory.newString("2")));
+ Assert.assertEquals(asList(-321), checkDeserialize(ValueFactory.newString("-321")));
+ Assert.assertEquals(asList(91), checkDeserialize(ValueFactory.newFloat(91.32)));
+ }
+}
diff --git a/ksql-core/src/test/java/io/confluent/ksql/serde/msgpack/KsqlMsgpackSerializerTest.java b/ksql-core/src/test/java/io/confluent/ksql/serde/msgpack/KsqlMsgpackSerializerTest.java
new file mode 100644
index 0000000..0a12dc9
--- /dev/null
+++ b/ksql-core/src/test/java/io/confluent/ksql/serde/msgpack/KsqlMsgpackSerializerTest.java
@@ -0,0 +1,122 @@
+/**
+ * Copyright 2017 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package io.confluent.ksql.serde.msgpack;
+
+import io.confluent.ksql.physical.GenericRow;
+import io.confluent.ksql.serde.json.KsqlJsonDeserializer;
+import io.confluent.ksql.serde.json.KsqlJsonSerializer;
+import java.util.List;
+import java.util.Objects;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import static java.util.Arrays.asList;
+
+public class KsqlMsgpackSerializerTest {
+
+ private Schema schema;
+ private KsqlMsgpackSerializer ser;
+ private KsqlMsgpackDeserializer deser;
+ private KsqlJsonSerializer jsonSer;
+ private KsqlJsonDeserializer jsonDeser;
+
+ private void prepare(Schema schema) {
+ this.schema = schema;
+ this.ser = new KsqlMsgpackSerializer(schema);
+ this.deser = new KsqlMsgpackDeserializer(schema);
+ this.jsonSer = new KsqlJsonSerializer(schema);
+ this.jsonDeser = new KsqlJsonDeserializer(schema);
+ }
+
+ private List<Object> checkRoundTrip(Object... values) {
+ byte[] msgpack = ser.serialize("t", row(values));
+ List<Object> roundTrip = deser.deserialize("t", msgpack).getColumns();
+
+ // verify that behavior is same with json
+ byte[] json = jsonSer.serialize("t", row(values));
+ List<Object> jsonRoundTrip = jsonDeser.deserialize("t", json).getColumns();
+ Assert.assertEquals(jsonRoundTrip, roundTrip);
+
+ return roundTrip;
+ }
+
+ private static GenericRow row(Object... values) {
+ return new GenericRow(asList(values));
+ }
+
+ @Test
+ public void testBoolean() {
+ prepare(SchemaBuilder.struct().field("BOOLEAN", SchemaBuilder.BOOLEAN_SCHEMA).build());
+ Assert.assertEquals(asList(true), checkRoundTrip(true));
+ Assert.assertEquals(asList(false), checkRoundTrip(false));
+ }
+
+ @Test
+ public void testInt32() {
+ prepare(SchemaBuilder.struct().field("INT32", SchemaBuilder.INT32_SCHEMA).build());
+ Assert.assertEquals(asList(0), checkRoundTrip(0));
+ Assert.assertEquals(asList(1), checkRoundTrip(1));
+ Assert.assertEquals(asList(-321398), checkRoundTrip(-321398));
+ Assert.assertEquals(asList(Integer.MAX_VALUE), checkRoundTrip(Integer.MAX_VALUE));
+ Assert.assertEquals(asList(Integer.MIN_VALUE), checkRoundTrip(Integer.MIN_VALUE));
+ }
+
+ @Test
+ public void testInt64() {
+ prepare(SchemaBuilder.struct().field("INT64", SchemaBuilder.INT64_SCHEMA).build());
+ Assert.assertEquals(asList(0L), checkRoundTrip(0L));
+ Assert.assertEquals(asList(1L), checkRoundTrip(1L));
+ Assert.assertEquals(asList(-321398L), checkRoundTrip(-321398L));
+ Assert.assertEquals(asList((long) Integer.MAX_VALUE), checkRoundTrip((long) Integer.MAX_VALUE));
+ Assert.assertEquals(asList((long) Integer.MIN_VALUE), checkRoundTrip((long) Integer.MIN_VALUE));
+ Assert.assertEquals(asList(Long.MAX_VALUE), checkRoundTrip(Long.MAX_VALUE));
+ Assert.assertEquals(asList(Long.MIN_VALUE), checkRoundTrip(Long.MIN_VALUE));
+ }
+
+ @Test
+ public void testFloat64() {
+ prepare(SchemaBuilder.struct().field("FLOAT64", SchemaBuilder.FLOAT64_SCHEMA).build());
+ Assert.assertEquals(asList(0.1), checkRoundTrip(0.1));
+ Assert.assertEquals(asList(0.3), checkRoundTrip(0.3));
+ Assert.assertEquals(asList(9321.45), checkRoundTrip(9321.45));
+ }
+
+ @Test
+ public void testString() {
+ prepare(SchemaBuilder.struct().field("STRING", SchemaBuilder.STRING_SCHEMA).build());
+ Assert.assertEquals(asList(""), checkRoundTrip(""));
+ Assert.assertEquals(asList("a"), checkRoundTrip("a"));
+ Assert.assertEquals(asList("\343\201\202"), checkRoundTrip("\343\201\202"));
+ Assert.assertEquals(asList("test0123456789KafkaAndMessagePackTestCase"), checkRoundTrip("test0123456789KafkaAndMessagePackTestCase"));
+ }
+
+ // TODO test array<string>
+ // TODO test array<int64>
+ // TODO test map<string, string>
+
+ //// TODO do this in deser test
+ //@Test
+ //public void testEnforceToBoolean() {
+ // Assert.assertTrue(checkRoundTrip("true").equals(asList(true)));
+ // Assert.assertTrue(checkRoundTrip("false").equals(asList(false)));
+ // Assert.assertTrue(checkRoundTrip(1).equals(asList(true)));
+ // Assert.assertTrue(checkRoundTrip(0).equals(asList(false)));
+ //}
+}
diff --git a/pom.xml b/pom.xml
index e37f26b..3327114 100644
--- a/pom.xml
+++ b/pom.xml
@@ -45,6 +45,7 @@
<jline.version>3.3.1</jline.version>
<jna.version>4.4.0</jna.version>
<jsr305.version>3.0.2</jsr305.version>
+ <msgpack.version>0.8.13</msgpack.version>
<really.executable.jar.version>1.5.0</really.executable.jar.version>
<licenses.version>3.3.0-SNAPSHOT</licenses.version>
<exec-maven-plugin.version>1.2.1</exec-maven-plugin.version>
@@ -163,6 +164,12 @@
</dependency>
<dependency>
+ <groupId>org.msgpack</groupId>
+ <artifactId>msgpack-core</artifactId>
+ <version>${msgpack.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>${janino.version}</version>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment