Skip to content

Instantly share code, notes, and snippets.

@visualskyrim
Created June 13, 2017 13:20
Show Gist options
  • Save visualskyrim/66148758af9e993d7ac79a4c1206f458 to your computer and use it in GitHub Desktop.
Save visualskyrim/66148758af9e993d7ac79a4c1206f458 to your computer and use it in GitHub Desktop.
Simple Flink job streaming data from Kafka to local files.
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import java.util.Properties;
public class DataTransfer {
public static void main(String[] args) throws Exception {
// Properties for Kafka
Properties kafkaProps = new Properties();
kafkaProps.setProperty("topic", "flink-test");
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
kafkaProps.setProperty("zookeeper.connect", "localhost:2181");
kafkaProps.setProperty("group.id", "flink-test");
// Flink environment setup
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
// Flink check/save point setting
env.enableCheckpointing(30000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);
env.getCheckpointConfig().setCheckpointTimeout(10000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
// Init the stream
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer08<String>(
"flink-test",
new SimpleStringSchema(),
kafkaProps));
// Path of the output
String basePath = "<some-place-in-your-machine>"; // Here is you output path
BucketingSink<String> hdfsSink = new BucketingSink<>(basePath);
hdfsSink.setBucketer(new DateTimeBucketer<>("yyyy-MM-dd--HH-mm"));
stream.print();
stream.addSink(hdfsSink);
env.execute();
}
}
@dmazzer
Copy link

dmazzer commented Oct 6, 2017

Thanks!! Your gist help me a lot!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment