Skip to content

Instantly share code, notes, and snippets.

@ibodrov
Last active January 29, 2016 09:00
Show Gist options
  • Save ibodrov/53f59a44aca097593b92 to your computer and use it in GitHub Desktop.
Save ibodrov/53f59a44aca097593b92 to your computer and use it in GitHub Desktop.
import java.util.Properties;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.junit.Test;
public class FailureTest {
@Test
public void test() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
env.addSource(createKafkaSource())
.keyBy(new KeySelector<String, String>() {
@Override
public String getKey(String value) throws Exception {
return value;
}
})
.timeWindow(Time.seconds(2))
.fold("", new FoldFunction<String, String>() {
@Override
public String fold(String accumulator, String value) throws Exception {
return accumulator + value;
}
})
.map(new RichMapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
if (value.equals("zz")) {
// I expect that failed messages will be reprocessed
// and Kafka's offset won't change.
throw new RuntimeException();
}
return "mapped " + value;
}
})
.print();
env.execute();
}
private static SourceFunction<String> createKafkaSource() {
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.setProperty("zookeeper.connect", "localhost:2181/kafka");
return new FlinkKafkaConsumer082<>("test", new SimpleStringSchema(), props);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment