Skip to content

Instantly share code, notes, and snippets.

@s1ck
Created October 10, 2016 11:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save s1ck/37aefb19198cd01a8b998fab354c2cfd to your computer and use it in GitHub Desktop.
Save s1ck/37aefb19198cd01a8b998fab354c2cfd to your computer and use it in GitHub Desktop.
TypeProblem when using RichFlatMapFunction on GenericArray Types
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.RichFlatJoinFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class TypeProblem {
/**
* Note that both cases work with {@code extends Tuple2<K, K[]>}
*
* @param <K>
*/
public static class Foo<K> extends Tuple2<K[], K> {
public Foo() {
}
public Foo(K[] value0, K value1) {
super(value0, value1);
}
}
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Foo<Long> myFoo = new Foo<>(new Long[] { 0L, 1L, 2L }, 0L);
works(env, myFoo, 1).print();
worksNot(env, myFoo, 1).print();
}
private static <T> DataSet<Foo<T>> works(ExecutionEnvironment env, Foo<T> foo, int field) {
DataSource<Foo<T>> fooDataSource = env.fromElements(foo);
return fooDataSource.join(fooDataSource)
.where(field).equalTo(field)
.with(new FlatJoinFunction<Foo<T>, Foo<T>, Foo<T>>() {
@Override
public void join(Foo<T> first, Foo<T> second,
Collector<Foo<T>> out) throws Exception {
out.collect(first);
}
});
}
private static <T> DataSet<Foo<T>> worksNot(ExecutionEnvironment env, Foo<T> foo, int field) {
DataSource<Foo<T>> fooDataSource = env.fromElements(foo);
return fooDataSource.join(fooDataSource)
.where(field).equalTo(field)
.with(new RichFlatJoinFunction<Foo<T>, Foo<T>, Foo<T>>() {
@Override
public void join(Foo<T> first, Foo<T> second,
Collector<Foo<T>> out) throws Exception {
out.collect(first);
}
});
}
}
@s1ck
Copy link
Author

s1ck commented Oct 10, 2016

Leads to

10/10/2016 14:00:44 Job execution switched to status FAILING.
java.lang.ClassCastException: java.lang.Long cannot be cast to [Ljava.lang.Object;
    at org.apache.flink.api.common.typeutils.base.GenericArraySerializer.serialize(GenericArraySerializer.java:36)
    at org.apache.flink.api.common.typeutils.base.GenericArraySerializer.serialize(GenericArraySerializer.java:112)
    at org.apache.flink.api.common.typeutils.base.GenericArraySerializer.serialize(GenericArraySerializer.java:36)
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
    at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
    at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:83)
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:85)
    at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
    at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
    at org.gradoop.examples.TypeProblem$2.join(TypeProblem.java:61)
    at org.gradoop.examples.TypeProblem$2.join(TypeProblem.java:57)
    at org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashJoinIterator.callWithNextKey(NonReusingBuildFirstHashJoinIterator.java:149)
    at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:222)
    at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
    at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
    at java.lang.Thread.run(Thread.java:745)

@s1ck
Copy link
Author

s1ck commented Oct 11, 2016

Same exception for RichFlatMapFunction:

private static <T> DataSet<Foo<T>> worksNot(ExecutionEnvironment env, Foo<T> foo) {
    DataSource<Foo<T>> fooDataSource = env.fromElements(foo);

    return fooDataSource.flatMap(new RichFlatMapFunction<Foo<T>, Foo<T>>() {
      @Override
      public void flatMap(Foo<T> foo, Collector<Foo<T>> out) throws Exception {
        out.collect(foo);
      }
    });
  }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment