Skip to content

Instantly share code, notes, and snippets.

@s1ck
Created October 19, 2016 06:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save s1ck/caf9f3f46e7a5afe6f6a73c479948fec to your computer and use it in GitHub Desktop.
Save s1ck/caf9f3f46e7a5afe6f6a73c479948fec to your computer and use it in GitHub Desktop.
Type erasure problem solely on cluster execution
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;
import java.lang.reflect.Array;
public class Problem {
public static class Pojo {
}
public static class Foo<T> extends Tuple1<T> {
}
public static class Bar<T> extends Tuple1<T[]> {
}
public static class UDF<T> implements MapFunction<Foo<T>, Bar<T>> {
private final Class<T> clazz;
public UDF(Class<T> clazz) {
this.clazz = clazz;
}
@Override
public Bar<T> map(Foo<T> value) throws Exception {
Bar<T> bar = new Bar<>();
//noinspection unchecked
bar.f0 = (T[]) Array.newInstance(clazz, 10);
return bar;
}
}
public static void main(String[] args) throws Exception {
// runs in local, collection and cluster execution
withLong();
// runs in local and collection execution, fails on cluster execution
withPojo();
}
public static void withLong() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Foo<Long> foo = new Foo<>();
foo.f0 = 42L;
DataSet<Foo<Long>> barDataSource = env.fromElements(foo);
DataSet<Bar<Long>> map = barDataSource.map(new UDF<>(Long.class));
map.print();
}
public static void withPojo() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Foo<Pojo> foo = new Foo<>();
foo.f0 = new Pojo();
DataSet<Foo<Pojo>> barDataSource = env.fromElements(foo);
DataSet<Bar<Pojo>> map = barDataSource.map(new UDF<>(Pojo.class));
map.print();
}
}
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class ProblemTest extends MultipleProgramsTestBase {
public ProblemTest(TestExecutionMode mode) {
super(mode);
}
@Test
public void testWithLong() throws Exception {
Problem.withLong();
}
@Test
public void testWithPOJO() throws Exception {
Problem.withPojo();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment