Skip to content

Instantly share code, notes, and snippets.

@s1ck
Created October 10, 2016 11:26
Show Gist options
  • Save s1ck/37aefb19198cd01a8b998fab354c2cfd to your computer and use it in GitHub Desktop.
Save s1ck/37aefb19198cd01a8b998fab354c2cfd to your computer and use it in GitHub Desktop.
TypeProblem when using RichFlatMapFunction on GenericArray Types
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.RichFlatJoinFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class TypeProblem {
/**
* Note that both cases work with {@code extends Tuple2<K, K[]>}
*
* @param <K>
*/
public static class Foo<K> extends Tuple2<K[], K> {
public Foo() {
}
public Foo(K[] value0, K value1) {
super(value0, value1);
}
}
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Foo<Long> myFoo = new Foo<>(new Long[] { 0L, 1L, 2L }, 0L);
works(env, myFoo, 1).print();
worksNot(env, myFoo, 1).print();
}
private static <T> DataSet<Foo<T>> works(ExecutionEnvironment env, Foo<T> foo, int field) {
DataSource<Foo<T>> fooDataSource = env.fromElements(foo);
return fooDataSource.join(fooDataSource)
.where(field).equalTo(field)
.with(new FlatJoinFunction<Foo<T>, Foo<T>, Foo<T>>() {
@Override
public void join(Foo<T> first, Foo<T> second,
Collector<Foo<T>> out) throws Exception {
out.collect(first);
}
});
}
private static <T> DataSet<Foo<T>> worksNot(ExecutionEnvironment env, Foo<T> foo, int field) {
DataSource<Foo<T>> fooDataSource = env.fromElements(foo);
return fooDataSource.join(fooDataSource)
.where(field).equalTo(field)
.with(new RichFlatJoinFunction<Foo<T>, Foo<T>, Foo<T>>() {
@Override
public void join(Foo<T> first, Foo<T> second,
Collector<Foo<T>> out) throws Exception {
out.collect(first);
}
});
}
}
@s1ck
Copy link
Author

s1ck commented Oct 11, 2016

Same exception for RichFlatMapFunction:

private static <T> DataSet<Foo<T>> worksNot(ExecutionEnvironment env, Foo<T> foo) {
    DataSource<Foo<T>> fooDataSource = env.fromElements(foo);

    return fooDataSource.flatMap(new RichFlatMapFunction<Foo<T>, Foo<T>>() {
      @Override
      public void flatMap(Foo<T> foo, Collector<Foo<T>> out) throws Exception {
        out.collect(foo);
      }
    });
  }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment