Skip to content

Instantly share code, notes, and snippets.

@tweise
Created May 21, 2021 03:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tweise/3139d66461e87986f6eddc70ff06ef9a to your computer and use it in GitHub Desktop.
Save tweise/3139d66461e87986f6eddc70ff06ef9a to your computer and use it in GitHub Desktop.
HybridSourceExample.java
File testDir = new File("/Users/thomas/hybridsource/test");
final FileSource<String> fileSource =
    FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir)).build();
// directory doesn't matter; splits are supplied by converter
final FileSource<String> fileSource2 =
    FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir)).build();
KafkaSource<String> kafkaSource =
    KafkaSource.<String>builder()
        .setBootstrapServers("localhost:9092")
        .setGroupId("MyGroup")
        .setTopics(Arrays.asList("quickstart-events"))
        .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
        .setStartingOffsets(OffsetsInitializer.earliest())
        .build();
HybridSource.SourceChain<String, FileSourceSplit, PendingSplitsCheckpoint<FileSourceSplit>>
    sourceChain = HybridSource.SourceChain.of(fileSource);
sourceChain =
    sourceChain.add(
        fileSource2,
        (pendingSplitsCheckpoint -> {
          // TODO: pendingSplitsCheckpoint doesn't have alreadyProcessedPaths (or
          // other info about finished splits)
          // all files processed by previous source, do nothing
          return PendingSplitsCheckpoint.fromCollectionSnapshot(Collections.emptyList());
        }));
HybridSource.SourceChain<String, KafkaPartitionSplit, KafkaSourceEnumState>
    sourceChainWithKafka = sourceChain.add(kafkaSource);
Source source = new HybridSource<>(sourceChainWithKafka);
final DataStream<String> sourceStream =
    env.fromSource(source, WatermarkStrategy.noWatermarks(), "files-or-kafka")
        .returns(String.class);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment