Skip to content

Instantly share code, notes, and snippets.

@liorksh
Last active Sep 15, 2020
Embed
What would you like to do?
// The returned stream definition includes both streams data type
ConnectedStreams<Tuple2<String, Integer>, Tuple2<String, String>> mergedStream
= singerStream
.connect(playerStream);
DataStream<Tuple4<String, String, String, Integer>> combinedStream
= mergedStream.map(new CoMapFunction<
Tuple2<String, Integer>, //Stream 1
Tuple2<String, String>, //Stream 2
Tuple4<String, String, String, Integer> //Output
>() {
@Override
public Tuple4<String, String, String, Integer> //Process Stream 1
map1(Tuple2<String, Integer> singer) throws Exception {
return new Tuple4<String, String, String, Integer>
("Source: singer stream", singer.f0, "", singer.f1);
}
@Override
public Tuple4<String, String, String, Integer> //Process Stream 2
map2(Tuple2<String, String> player) throws Exception {
return new Tuple4<String, String, String, Integer>
("Source: player stream", player.f0, player.f1, 0);
}
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment