-
-
Save tweise/3139d66461e87986f6eddc70ff06ef9a to your computer and use it in GitHub Desktop.
HybridSourceExample.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
File testDir = new File("/Users/thomas/hybridsource/test"); | |
final FileSource<String> fileSource = | |
FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir)).build(); | |
// directory doesn't matter; splits are supplied by converter | |
final FileSource<String> fileSource2 = | |
FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir)).build(); | |
KafkaSource<String> kafkaSource = | |
KafkaSource.<String>builder() | |
.setBootstrapServers("localhost:9092") | |
.setGroupId("MyGroup") | |
.setTopics(Arrays.asList("quickstart-events")) | |
.setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class)) | |
.setStartingOffsets(OffsetsInitializer.earliest()) | |
.build(); | |
HybridSource.SourceChain<String, FileSourceSplit, PendingSplitsCheckpoint<FileSourceSplit>> | |
sourceChain = HybridSource.SourceChain.of(fileSource); | |
sourceChain = | |
sourceChain.add( | |
fileSource2, | |
(pendingSplitsCheckpoint -> { | |
// TODO: pendingSplitsCheckpoint doesn't have alreadyProcessedPaths (or | |
// other info about finished splits) | |
// all files processed by previous source, do nothing | |
return PendingSplitsCheckpoint.fromCollectionSnapshot(Collections.emptyList()); | |
})); | |
HybridSource.SourceChain<String, KafkaPartitionSplit, KafkaSourceEnumState> | |
sourceChainWithKafka = sourceChain.add(kafkaSource); | |
Source source = new HybridSource<>(sourceChainWithKafka); | |
final DataStream<String> sourceStream = | |
env.fromSource(source, WatermarkStrategy.noWatermarks(), "files-or-kafka") | |
.returns(String.class); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment