Skip to content

Instantly share code, notes, and snippets.

@pluone
Created December 4, 2018 06:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pluone/f627a5d67ed32a42768af64961b6e73b to your computer and use it in GitHub Desktop.
Save pluone/f627a5d67ed32a42768af64961b6e73b to your computer and use it in GitHub Desktop.
Kafaka stream countdownlatch的方法替代了while true循环
public class Pipe {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
builder.stream("streams-plaintext-input").to("streams-pipe-output");
Topology topology = builder.build();
KafkaStreams kafkaStreams = new KafkaStreams(topology, props);
CountDownLatch countDownLatch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
kafkaStreams.close();
countDownLatch.countDown();
}
});
kafkaStreams.start();
try {
countDownLatch.await();
} catch (InterruptedException e) {
System.exit(1);
}
System.exit(0);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment