Skip to content

Instantly share code, notes, and snippets.

@brianmartin
Created July 11, 2012 22:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save brianmartin/3094071 to your computer and use it in GitHub Desktop.
Save brianmartin/3094071 to your computer and use it in GitHub Desktop.
ByteBuffer serialization for Storm
From 83c32302339aefe7ac322f8a9b7a47a08173095d Mon Sep 17 00:00:00 2001
From: Brian Martin <brianmartin@gmail.com>
Date: Wed, 11 Jul 2012 15:06:02 -0700
Subject: [PATCH] initial ByteBuffer serialization -- not working
---
.../storm/serialization/SerializationFactory.java | 2 +
.../serialization/types/ByteBufferSerializer.java | 42 ++++++++++++++++++++
test/clj/backtype/storm/serialization_test.clj | 15 +++++++
3 files changed, 59 insertions(+)
create mode 100644 src/jvm/backtype/storm/serialization/types/ByteBufferSerializer.java
diff --git a/src/jvm/backtype/storm/serialization/SerializationFactory.java b/src/jvm/backtype/storm/serialization/SerializationFactory.java
index e35fc90..63e0fea 100644
--- a/src/jvm/backtype/storm/serialization/SerializationFactory.java
+++ b/src/jvm/backtype/storm/serialization/SerializationFactory.java
@@ -4,6 +4,7 @@ import backtype.storm.Config;
import backtype.storm.generated.ComponentCommon;
import backtype.storm.generated.StormTopology;
import backtype.storm.serialization.types.ArrayListSerializer;
+import backtype.storm.serialization.types.ByteBufferSerializer;
import backtype.storm.serialization.types.HashMapSerializer;
import backtype.storm.serialization.types.HashSetSerializer;
import backtype.storm.transactional.TransactionAttempt;
@@ -57,6 +58,7 @@ public class SerializationFactory {
k.register(BigInteger.class, new BigIntegerSerializer());
k.register(TransactionAttempt.class);
k.register(Values.class);
+ k.register(ByteBuffer.class, new ByteBufferSerializer());
try {
JavaBridge.registerPrimitives(k);
JavaBridge.registerCollections(k);
diff --git a/src/jvm/backtype/storm/serialization/types/ByteBufferSerializer.java b/src/jvm/backtype/storm/serialization/types/ByteBufferSerializer.java
new file mode 100644
index 0000000..7751a05
--- /dev/null
+++ b/src/jvm/backtype/storm/serialization/types/ByteBufferSerializer.java
@@ -0,0 +1,42 @@
+package backtype.storm.serialization.types;
+
+import java.nio.ByteBuffer;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+public class ByteBufferSerializer extends Serializer<ByteBuffer> {
+ {
+ setAcceptsNull(true);
+ }
+
+ public void write (Kryo kryo, Output output, ByteBuffer object) {
+ if (object == null) {
+ output.writeByte(Kryo.NULL);
+ return;
+ }
+
+ byte[] array;
+ if (object.hasArray()) {
+ array = object.array();
+ }
+ else {
+ object.clear();
+ array = new byte[object.capacity()];
+ object.get(array, 0, array.length);
+ }
+
+ output.writeInt(array.length + 1, true);
+ output.writeBytes(array);
+ }
+
+ public ByteBuffer create (Kryo kryo, Input input, Class<ByteBuffer> type) {
+ int length = input.readInt(true);
+ if (length == Kryo.NULL) return null;
+ byte[] array = input.readBytes(length - 1);
+ return ByteBuffer.wrap(array);
+ }
+
+}
\ No newline at end of file
diff --git a/test/clj/backtype/storm/serialization_test.clj b/test/clj/backtype/storm/serialization_test.clj
index 5bc419a..527dc0c 100644
--- a/test/clj/backtype/storm/serialization_test.clj
+++ b/test/clj/backtype/storm/serialization_test.clj
@@ -3,6 +3,7 @@
(:import [backtype.storm.serialization KryoTupleSerializer KryoTupleDeserializer
KryoValuesSerializer KryoValuesDeserializer])
(:import [backtype.storm.testing TestSerObject])
+ (:import [java.nio ByteBuffer])
(:use [backtype.storm util config])
)
@@ -56,3 +57,17 @@
(is-roundtrip [#{:a :b :c}])
(is-roundtrip [#{:a :b} 1 2 ["a" 3 5 #{5 6}]])
(is-roundtrip [{:a [1 2 #{:a :b 1}] :b 3}]))
+
+(defn serialize-object [val conf]
+ (let [serializer (KryoValuesSerializer. (mk-conf conf))]
+ (.serializeObject serializer val)
+ ))
+
+(deftest test-bytebuffer-serialization
+ (let [nums [0 1 2 3 4]
+ input (map byte [0 1 2 3 4])
+ conf {}
+ buffer-in (ByteBuffer/wrap (byte-array input))
+ buffer-out (deserialize (serialize-object buffer-in conf))]
+ (is (= input (.array buffer-out)))
+ ))
--
1.7.10.1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment