Created
April 16, 2021 19:36
-
-
Save qqibrow/f297babadb0bb662ee398b9088870785 to your computer and use it in GitHub Desktop.
Reproduce Iterator Operator Checkpoint Issue in 1.11
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.demo; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.Random; | |
import org.apache.flink.api.common.functions.MapFunction; | |
import org.apache.flink.api.java.tuple.Tuple2; | |
import org.apache.flink.api.java.tuple.Tuple5; | |
import org.apache.flink.api.java.utils.ParameterTool; | |
import org.apache.flink.streaming.api.CheckpointingMode; | |
import org.apache.flink.streaming.api.collector.selector.OutputSelector; | |
import org.apache.flink.streaming.api.datastream.DataStream; | |
import org.apache.flink.streaming.api.datastream.IterativeStream; | |
import org.apache.flink.streaming.api.datastream.SplitStream; | |
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
import org.apache.flink.streaming.api.functions.source.SourceFunction; | |
public class IteratorOperatorExample { | |
private static final int BOUND = 100; | |
// ************************************************************************* | |
// PROGRAM | |
// ************************************************************************* | |
public static void main(String[] args) throws Exception { | |
// Checking input parameters | |
final ParameterTool params = ParameterTool.fromArgs(args); | |
// set up input for the stream of integer pairs | |
// obtain execution environment and set setBufferTimeout to 1 to enable | |
// continuous flushing of the output buffers (lowest latency) | |
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
env.enableCheckpointing(1000 * 60, CheckpointingMode.AT_LEAST_ONCE, true); | |
// make parameters available in the web interface | |
env.getConfig().setGlobalJobParameters(params); | |
// create input stream of integer pairs | |
DataStream<Tuple2<Integer, Integer>> inputStream; | |
if (params.has("input")) { | |
inputStream = env.readTextFile(params.get("input")).map(new FibonacciInputMap()); | |
} else { | |
System.out.println("Executing Iterate example with default input data set."); | |
System.out.println("Use --input to specify file input."); | |
inputStream = env.addSource(new RandomFibonacciSource()); | |
} | |
// create an iterative data stream from the input with 5 second timeout | |
// maxWaitTime is set to 1 day | |
IterativeStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> | |
it = inputStream.map(new InputMap()) | |
.iterate(24 * 60 * 60 * 1000); | |
// apply the step function to get the next Fibonacci number | |
// increment the counter and split the output with the output selector | |
SplitStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> step = it.map(new Step()) | |
.split(new MySelector()); | |
// close the iteration by selecting the tuples that were directed to the | |
// 'iterate' channel in the output selector | |
it.closeWith(step.select("iterate")); | |
// to produce the final output select the tuples directed to the | |
// 'output' channel then get the input pairs that have the greatest iteration counter | |
// on a 1 second sliding window | |
DataStream<Tuple2<Tuple2<Integer, Integer>, Integer>> numbers = step.select("output") | |
.map(new OutputMap()); | |
// emit results | |
if (params.has("output")) { | |
numbers.writeAsText(params.get("output")); | |
} else { | |
System.out.println("Printing result to stdout. Use --output to specify output path."); | |
numbers.print(); | |
} | |
// execute the program | |
env.execute("Streaming Iteration Example"); | |
} | |
// ************************************************************************* | |
// USER FUNCTIONS | |
// ************************************************************************* | |
/** | |
* Generate BOUND number of random integer pairs from the range from 1 to BOUND/2. | |
*/ | |
private static class RandomFibonacciSource implements SourceFunction<Tuple2<Integer, Integer>> { | |
private static final long serialVersionUID = 1L; | |
private Random rnd = new Random(); | |
private volatile boolean isRunning = true; | |
private int counter = 0; | |
@Override | |
public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception { | |
while (isRunning) { | |
int first = rnd.nextInt(BOUND / 2 - 1) + 1; | |
int second = rnd.nextInt(BOUND / 2 - 1) + 1; | |
ctx.collect(new Tuple2<>(first, second)); | |
counter++; | |
Thread.sleep(50L); | |
} | |
} | |
@Override | |
public void cancel() { | |
isRunning = false; | |
} | |
} | |
/** | |
* Generate random integer pairs from the range from 0 to BOUND/2. | |
*/ | |
private static class FibonacciInputMap implements | |
MapFunction<String, Tuple2<Integer, Integer>> { | |
private static final long serialVersionUID = 1L; | |
@Override | |
public Tuple2<Integer, Integer> map(String value) throws Exception { | |
String record = value.substring(1, value.length() - 1); | |
String[] splitted = record.split(","); | |
return new Tuple2<>(Integer.parseInt(splitted[0]), Integer.parseInt(splitted[1])); | |
} | |
} | |
/** | |
* Map the inputs so that the next Fibonacci numbers can be calculated while preserving the original input tuple. | |
* A counter is attached to the tuple and incremented in every iteration step. | |
*/ | |
public static class InputMap implements MapFunction<Tuple2<Integer, Integer>, Tuple5<Integer, Integer, Integer, | |
Integer, Integer>> { | |
private static final long serialVersionUID = 1L; | |
@Override | |
public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple2<Integer, Integer> value) throws | |
Exception { | |
return new Tuple5<>(value.f0, value.f1, value.f0, value.f1, 0); | |
} | |
} | |
/** | |
* Iteration step function that calculates the next Fibonacci number. | |
*/ | |
public static class Step implements | |
MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>, Tuple5<Integer, Integer, Integer, | |
Integer, Integer>> { | |
private static final long serialVersionUID = 1L; | |
@Override | |
public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple5<Integer, Integer, Integer, Integer, | |
Integer> value) throws Exception { | |
return new Tuple5<>(value.f0, value.f1, value.f3, value.f2 + value.f3, ++value.f4); | |
} | |
} | |
/** | |
* OutputSelector testing which tuple needs to be iterated again. | |
*/ | |
public static class MySelector implements | |
OutputSelector<Tuple5<Integer, Integer, Integer, Integer, Integer>> { | |
private static final long serialVersionUID = 1L; | |
@Override | |
public Iterable<String> select(Tuple5<Integer, Integer, Integer, Integer, Integer> value) { | |
List<String> output = new ArrayList<>(); | |
if (value.f2 < BOUND && value.f3 < BOUND) { | |
// No output back to iterate operator | |
// output.add("iterate"); | |
} else { | |
output.add("output"); | |
} | |
return output; | |
} | |
} | |
/** | |
* Giving back the input pair and the counter. | |
*/ | |
public static class OutputMap implements MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>, | |
Tuple2<Tuple2<Integer, Integer>, Integer>> { | |
private static final long serialVersionUID = 1L; | |
@Override | |
public Tuple2<Tuple2<Integer, Integer>, Integer> map(Tuple5<Integer, Integer, Integer, Integer, Integer> | |
value) throws | |
Exception { | |
return new Tuple2<>(new Tuple2<>(value.f0, value.f1), value.f4); | |
} | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment