Skip to content

Instantly share code, notes, and snippets.

@tianshizz
Last active February 7, 2021 03:18
Show Gist options
  • Save tianshizz/fd1bdbcc4676783b7300984c0bf532c1 to your computer and use it in GitHub Desktop.
Save tianshizz/fd1bdbcc4676783b7300984c0bf532c1 to your computer and use it in GitHub Desktop.
@Test
public void testUnionSelfWhichWillFail() throws Exception {
List<Integer> l = Arrays.asList(1, 2, 3, 4);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Integer> s1 = env.fromCollection(l);
DataStream<Integer> s = s1.union(s1);
List<Integer> res = s.executeAndCollect(8);
List<Integer> expected = new ArrayList<>(l);
expected.addAll(l);
// expected = [1, 2, 3, 4, 1, 2, 3, 4]
// res = [1, 1, 2, 2, 3, 3, 4, 4]
for (int i = 0; i < expected.size(); i++) {
Assertions.assertEquals(expected.get(i), res.get(i));
}
}
@Test
public void testUnionIdenticalWhichWillPass() throws Exception {
List<Integer> l = Arrays.asList(1, 2, 3, 4);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Integer> s1 = env.fromCollection(l);
DataStreamSource<Integer> s2 = env.fromCollection(l);
DataStream<Integer> s = s1.union(s2);
List<Integer> res = s.executeAndCollect(8);
List<Integer> expected = new ArrayList<>(l);
expected.addAll(l);
// expected = [1, 2, 3, 4, 1, 2, 3, 4]
// res = [1, 2, 3, 4, 1, 2, 3, 4]
for (int i = 0; i < expected.size(); i++) {
Assertions.assertEquals(expected.get(i), res.get(i));
}
}
@Test
public void testUnionIdenticalWithALongListWillFail() throws Exception {
List<Integer> l = new ArrayList<>();
for (int i = 0; i < 10000; i ++) {
l.add(i);
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Integer> s1 = env.fromCollection(l);
DataStreamSource<Integer> s2 = env.fromCollection(l);
DataStream<Integer> s = s1.union(s2);
List<Integer> res = s.executeAndCollect(20000);
List<Integer> expected = new ArrayList<>(l);
expected.addAll(l);
for (int i = 0; i < expected.size(); i++) {
Assertions.assertEquals(expected.get(i), res.get(i));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment