Created
July 11, 2012 22:12
-
-
Save brianmartin/3094071 to your computer and use it in GitHub Desktop.
ByteBuffer serialization for Storm
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
From 83c32302339aefe7ac322f8a9b7a47a08173095d Mon Sep 17 00:00:00 2001 | |
From: Brian Martin <brianmartin@gmail.com> | |
Date: Wed, 11 Jul 2012 15:06:02 -0700 | |
Subject: [PATCH] initial ByteBuffer serialization -- not working | |
--- | |
.../storm/serialization/SerializationFactory.java | 2 + | |
.../serialization/types/ByteBufferSerializer.java | 42 ++++++++++++++++++++ | |
test/clj/backtype/storm/serialization_test.clj | 15 +++++++ | |
3 files changed, 59 insertions(+) | |
create mode 100644 src/jvm/backtype/storm/serialization/types/ByteBufferSerializer.java | |
diff --git a/src/jvm/backtype/storm/serialization/SerializationFactory.java b/src/jvm/backtype/storm/serialization/SerializationFactory.java | |
index e35fc90..63e0fea 100644 | |
--- a/src/jvm/backtype/storm/serialization/SerializationFactory.java | |
+++ b/src/jvm/backtype/storm/serialization/SerializationFactory.java | |
@@ -4,6 +4,7 @@ import backtype.storm.Config; | |
import backtype.storm.generated.ComponentCommon; | |
import backtype.storm.generated.StormTopology; | |
import backtype.storm.serialization.types.ArrayListSerializer; | |
+import backtype.storm.serialization.types.ByteBufferSerializer; | |
import backtype.storm.serialization.types.HashMapSerializer; | |
import backtype.storm.serialization.types.HashSetSerializer; | |
import backtype.storm.transactional.TransactionAttempt; | |
@@ -57,6 +58,7 @@ public class SerializationFactory { | |
k.register(BigInteger.class, new BigIntegerSerializer()); | |
k.register(TransactionAttempt.class); | |
k.register(Values.class); | |
+ k.register(ByteBuffer.class, new ByteBufferSerializer()); | |
try { | |
JavaBridge.registerPrimitives(k); | |
JavaBridge.registerCollections(k); | |
diff --git a/src/jvm/backtype/storm/serialization/types/ByteBufferSerializer.java b/src/jvm/backtype/storm/serialization/types/ByteBufferSerializer.java | |
new file mode 100644 | |
index 0000000..7751a05 | |
--- /dev/null | |
+++ b/src/jvm/backtype/storm/serialization/types/ByteBufferSerializer.java | |
@@ -0,0 +1,42 @@ | |
+package backtype.storm.serialization.types; | |
+ | |
+import java.nio.ByteBuffer; | |
+ | |
+import com.esotericsoftware.kryo.Kryo; | |
+import com.esotericsoftware.kryo.Serializer; | |
+import com.esotericsoftware.kryo.io.Input; | |
+import com.esotericsoftware.kryo.io.Output; | |
+ | |
+public class ByteBufferSerializer extends Serializer<ByteBuffer> { | |
+ { | |
+ setAcceptsNull(true); | |
+ } | |
+ | |
+ public void write (Kryo kryo, Output output, ByteBuffer object) { | |
+ if (object == null) { | |
+ output.writeByte(Kryo.NULL); | |
+ return; | |
+ } | |
+ | |
+ byte[] array; | |
+ if (object.hasArray()) { | |
+ array = object.array(); | |
+ } | |
+ else { | |
+ object.clear(); | |
+ array = new byte[object.capacity()]; | |
+ object.get(array, 0, array.length); | |
+ } | |
+ | |
+ output.writeInt(array.length + 1, true); | |
+ output.writeBytes(array); | |
+ } | |
+ | |
+ public ByteBuffer create (Kryo kryo, Input input, Class<ByteBuffer> type) { | |
+ int length = input.readInt(true); | |
+ if (length == Kryo.NULL) return null; | |
+ byte[] array = input.readBytes(length - 1); | |
+ return ByteBuffer.wrap(array); | |
+ } | |
+ | |
+} | |
\ No newline at end of file | |
diff --git a/test/clj/backtype/storm/serialization_test.clj b/test/clj/backtype/storm/serialization_test.clj | |
index 5bc419a..527dc0c 100644 | |
--- a/test/clj/backtype/storm/serialization_test.clj | |
+++ b/test/clj/backtype/storm/serialization_test.clj | |
@@ -3,6 +3,7 @@ | |
(:import [backtype.storm.serialization KryoTupleSerializer KryoTupleDeserializer | |
KryoValuesSerializer KryoValuesDeserializer]) | |
(:import [backtype.storm.testing TestSerObject]) | |
+ (:import [java.nio ByteBuffer]) | |
(:use [backtype.storm util config]) | |
) | |
@@ -56,3 +57,17 @@ | |
(is-roundtrip [#{:a :b :c}]) | |
(is-roundtrip [#{:a :b} 1 2 ["a" 3 5 #{5 6}]]) | |
(is-roundtrip [{:a [1 2 #{:a :b 1}] :b 3}])) | |
+ | |
+(defn serialize-object [val conf] | |
+ (let [serializer (KryoValuesSerializer. (mk-conf conf))] | |
+ (.serializeObject serializer val) | |
+ )) | |
+ | |
+(deftest test-bytebuffer-serialization | |
+ (let [nums [0 1 2 3 4] | |
+ input (map byte [0 1 2 3 4]) | |
+ conf {} | |
+ buffer-in (ByteBuffer/wrap (byte-array input)) | |
+ buffer-out (deserialize (serialize-object buffer-in conf))] | |
+ (is (= input (.array buffer-out))) | |
+ )) | |
-- | |
1.7.10.1 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment