Skip to content

Instantly share code, notes, and snippets.

@mkscrg
Created July 20, 2012 20:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mkscrg/3153121 to your computer and use it in GitHub Desktop.
Save mkscrg/3153121 to your computer and use it in GitHub Desktop.
Test topology for Kryo serializer issue

storm-kryo-test

Kryo serializers seem to work locally, but they're not found on the remote cluster when needed for deserialization.

The test case shows that the serializer works:

mvn test

Run locally to see that the AModels are properly serialized and deserialized:

mvn package exec:java

Run remotely to see the worker die that's running PrinterBolt:

storm jar target/storm-kryo-test-1.0-jar-with-dependencies.jar com.mkscrg.sandbox.StormKryoTestTopology kryo-test

Here's the stack trace I'm seeing on the cluster:

2012-07-20 16:34:06 util [ERROR] Async loop died!
java.lang.RuntimeException: com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): com.mkscrg.sandbox.AModel
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:79)
        at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:53)
        at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:45)
        at backtype.storm.disruptor$consume_loop_STAR_$fn__1613.invoke(disruptor.clj:56)
        at backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
        at clojure.lang.AFn.run(AFn.java:24)
        at java.lang.Thread.run(Thread.java:662)
Caused by: com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): com.mkscrg.sandbox.AModel
        at com.esotericsoftware.kryo.Kryo.newInstantiator(Unknown Source)
        at com.esotericsoftware.kryo.Kryo.newInstance(Unknown Source)
        at com.esotericsoftware.kryo.Serializer.create(Unknown Source)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Unknown Source)
        at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(Unknown Source)
        at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(Unknown Source)
        at com.esotericsoftware.kryo.Kryo.readObject(Unknown Source)
        at backtype.storm.serialization.KryoValuesDeserializer.deserializeFrom(KryoValuesDeserializer.java:20)
        at backtype.storm.serialization.KryoTupleDeserializer.deserialize(KryoTupleDeserializer.java:36)
        at backtype.storm.daemon.executor$mk_task_receiver$fn__3955.invoke(executor.clj:303)
        at backtype.storm.disruptor$clojure_handler$reify__1601.onEvent(disruptor.clj:32)
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:76)
        ... 6 more
2012-07-20 16:34:06 executor [ERROR]
java.lang.RuntimeException: com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): com.mkscrg.sandbox.AModel
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:79)
        at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:53)
        at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:45)
        at backtype.storm.disruptor$consume_loop_STAR_$fn__1613.invoke(disruptor.clj:56)
        at backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
        at clojure.lang.AFn.run(AFn.java:24)
        at java.lang.Thread.run(Thread.java:662)
Caused by: com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): com.mkscrg.sandbox.AModel
        at com.esotericsoftware.kryo.Kryo.newInstantiator(Unknown Source)
        at com.esotericsoftware.kryo.Kryo.newInstance(Unknown Source)
        at com.esotericsoftware.kryo.Serializer.create(Unknown Source)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Unknown Source)
        at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(Unknown Source)
        at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(Unknown Source)
        at com.esotericsoftware.kryo.Kryo.readObject(Unknown Source)
        at backtype.storm.serialization.KryoValuesDeserializer.deserializeFrom(KryoValuesDeserializer.java:20)
        at backtype.storm.serialization.KryoTupleDeserializer.deserialize(KryoTupleDeserializer.java:36)
        at backtype.storm.daemon.executor$mk_task_receiver$fn__3955.invoke(executor.clj:303)
        at backtype.storm.disruptor$clojure_handler$reify__1601.onEvent(disruptor.clj:32)
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:76)
        ... 6 more
package com.mkscrg.sandbox;
import java.util.ArrayList;
import java.util.List;
public final class AModel {
private final int anInt;
private final List<String> aList;
public AModel(int anInt, List<String> aList) {
this.anInt = anInt;
this.aList = aList == null ? new ArrayList<String>() : new ArrayList<String>(aList);
}
public int getAnInt() {
return anInt;
}
public List<String> getAList() {
return new ArrayList<String>(aList);
}
@Override
public String toString() {
return "(" + anInt + "," + aList + ")";
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AModel aModel = (AModel) o;
if (anInt != aModel.anInt) return false;
if (!aList.equals(aModel.aList)) return false;
return true;
}
@Override
public int hashCode() {
int result = anInt;
result = 31 * result + aList.hashCode();
return result;
}
}
package com.mkscrg.sandbox;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
public final class AModelKryoSerializer extends Serializer<AModel> {
@Override
public void write(Kryo kryo, Output output, AModel aModel) {
output.writeInt(aModel.getAnInt());
kryo.writeObject(output, aModel.getAList(), kryo.getSerializer(Collection.class));
}
@Override
public AModel read(Kryo kryo, Input input, Class<AModel> aModelClass) {
int anInt = input.readInt();
List aList = kryo.readObject(input, ArrayList.class);
return new AModel(anInt, aList);
}
}
package com.mkscrg;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.mkscrg.sandbox.AModel;
import com.mkscrg.sandbox.AModelKryoSerializer;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static junit.framework.Assert.assertEquals;
public final class AModelKryoSerializerTest {
@Test
public void testSerialization() {
List<String> aList = new ArrayList<String>();
aList.add("Hello");
aList.add("world!");
AModel aModel = new AModel(5, aList);
int testInt = -123456;
Kryo kryo = new Kryo();
kryo.register(AModel.class, new AModelKryoSerializer());
final int bufferSize = 1024 * 10;
byte[] buffer = new byte[bufferSize];
Output output = new Output(buffer);
kryo.writeObject(output, aModel);
output.writeInt(testInt);
Input input = new Input(buffer);
AModel newAModel = kryo.readObject(input, AModel.class);
assertEquals(aModel, newAModel);
assertEquals(testInt, input.readInt());
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.mkscrg.sandbox</groupId>
<artifactId>storm-kryo-test</artifactId>
<version>1.0</version>
<repositories>
<repository>
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.mkscrg.sandbox.StormKryoTestTopology</mainClass>
</manifest>
</archive>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<configuration>
<mainClass>com.mkscrg.sandbox.StormKryoTestTopology</mainClass>
<classpathScope>compile</classpathScope>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>0.8.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
</dependency>
</dependencies>
</project>
package com.mkscrg.sandbox;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
public final class StormKryoTestTopology {
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
boolean isRemote = false;
String topologyName = "storm-kryo-test_local";
if (args != null && args.length > 0) {
isRemote = true;
topologyName = args[0];
}
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("aModel-spout", new AModelSpout());
topologyBuilder.setBolt("printer-bolt", new PrinterBolt())
.shuffleGrouping("aModel-spout");
StormTopology stormTopology = topologyBuilder.createTopology();
Config config = new Config();
config.registerSerialization(AModel.class, AModelKryoSerializer.class);
config.registerSerialization(Collection.class);
config.setFallBackOnJavaSerialization(false);
config.setMaxSpoutPending(1);
config.setNumWorkers(2);
if (isRemote) {
StormSubmitter.submitTopology(topologyName, config, stormTopology);
} else {
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology(topologyName, config, stormTopology);
Utils.sleep(20000);
localCluster.shutdown();
}
}
public static final class PrinterBolt extends BaseBasicBolt {
private static final Logger LOGGER = Logger.getLogger(PrinterBolt.class);
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
AModel aModel = (AModel) tuple.getValueByField(AModelSpout.AMODEL_FIELD);
LOGGER.info("Received " + aModel);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
// No output
}
}
public static final class AModelSpout extends BaseRichSpout {
public static final String AMODEL_FIELD = "aModel";
private static final Logger LOGGER = Logger.getLogger(AModelSpout.class);
private SpoutOutputCollector spoutOutputCollector;
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields(AMODEL_FIELD));
}
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
}
@Override
public void nextTuple() {
List<String> aList = new ArrayList<String>();
aList.add("Hello");
aList.add("world!");
AModel aModel = new AModel(5, aList);
LOGGER.info("Emitting " + aModel);
spoutOutputCollector.emit(new Values(aModel));
}
@Override
public void fail(Object msgId) {
LOGGER.info("Fail called with msgId " + msgId);
nextTuple();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment