Skip to content

Instantly share code, notes, and snippets.

@KKcorps
Created September 18, 2019 13:55
Show Gist options
  • Save KKcorps/43e1d44ca165fb8b6bc6a167716257c6 to your computer and use it in GitHub Desktop.
Save KKcorps/43e1d44ca165fb8b6bc6a167716257c6 to your computer and use it in GitHub Desktop.
public class TestCheckpointJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
Properties kafkaConsumerProperties = new Properties();
kafkaConsumerProperties.setProperty("bootstrap.servers", "localhost:9092");
kafkaConsumerProperties.setProperty("group.id", "test_group_id");
ObjectMapper objectMapper = new ObjectMapper();
FlinkKafkaConsumer010<String> kafkaConsumer010 = new FlinkKafkaConsumer010<>("test_topic", new SimpleStringSchema(), kafkaConsumerProperties);
DataStream<String> kafkaSource = env.addSource(kafkaConsumer010).name("kafka_source").uid("kafka_source");
DataStream<TestData> aggregatedStream = kafkaSource
.map(row -> objectMapper.readValue(row, TestData.class))
.keyBy(TestData::getKey)
.timeWindow(Time.hours(1))
.reduce((rowA, rowB) -> {
TestData result = new TestData();
result.setKey(rowA.getKey());
result.setValue(rowA.getValue() + rowB.getValue());
result.setCreatedAt(System.currentTimeMillis());
return result;
}).name("aggregate_stream").uid("aggregate_stream");
DataStream<LabeledTestData> labeledTestDataDataStream = aggregatedStream.keyBy(TestData::getKey).flatMap(new ClassifyData()).name("classify_data").uid("classify_data");
labeledTestDataDataStream.map(row -> objectMapper.writeValueAsString(row)).print();
env.execute();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment