Skip to content

Instantly share code, notes, and snippets.

@charles-tan
Last active March 5, 2024 19:47
Show Gist options
  • Save charles-tan/ba368eec5a2e4d81f23cf844d2510c93 to your computer and use it in GitHub Desktop.
Save charles-tan/ba368eec5a2e4d81f23cf844d2510c93 to your computer and use it in GitHub Desktop.
public class FlinkTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("source")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> sourceStream = env.fromSource(
source, WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source")
.uid("kafkasourceuid");
KafkaRecordSerializationSchema<String> serializer = KafkaRecordSerializationSchema.builder()
.setValueSerializationSchema(new SimpleStringSchema())
.setTopic("sink")
.build();
Properties kprops = new Properties();
kprops.setProperty("transaction.timeout.ms", "300000"); // e.g., 5 mins
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers("localhost:9092")
.setRecordSerializer(serializer)
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setKafkaProducerConfig(kprops)
.setTransactionalIdPrefix("txn-prefix")
.build();
sourceStream.sinkTo(sink);
env.enableCheckpointing(10000L);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
env.getCheckpointConfig().setCheckpointStorage("file:///tmp/checkpoints");
env.execute("tester");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment