This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Define a separate stream for Players | |
final OutputTag<Tuple2<String,String>> playerTag | |
= new OutputTag<Tuple2<String,String>>("player"){}; | |
// Define a separate stream for Singers | |
final OutputTag<Tuple2<String,Integer>> singerTag | |
= new OutputTag<Tuple2<String,Integer>>("singer"){}; | |
// Convert each record to an InputData object and split the main stream into two side streams. | |
SingleOutputStreamOperator<InputData> inputDataMain | |
= inputStream | |
.process(new ProcessFunction<String, InputData>() { | |
@Override | |
public void processElement( | |
String inputStr, | |
Context ctx, | |
Collector<InputData> collInputData) { | |
Utils.print(Utils.COLOR_CYAN, "Received record : " + inputStr); | |
// Convert a String to an InputData Object | |
InputData inputData = InputData.getDataObject(inputStr); | |
switch (inputData.getType()) | |
{ | |
case "Singer": | |
//Create output tuple with name and count | |
ctx.output(singerTag, | |
new Tuple2<String,Integer> | |
(inputData.getName(), inputData.getScore())); | |
break; | |
case "Player": | |
// Create output tuple with name and type; | |
// if the newly created tuple doesn't match the playerTag type then a compilation error is raised ("method output cannot be applied to given types") | |
ctx.output(playerTag, | |
new Tuple2<String, String> | |
(inputData.getName(), inputData.getType())); | |
break; | |
default: | |
// Collect main output as InputData objects | |
collInputData.collect(inputData); | |
break; | |
} | |
} | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment