Skip to content

Instantly share code, notes, and snippets.

@ftrossbach
Created March 17, 2017 13:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ftrossbach/968dffaebd82ec9a5574e68c1c93c0df to your computer and use it in GitHub Desktop.
Save ftrossbach/968dffaebd82ec9a5574e68c1c93c0df to your computer and use it in GitHub Desktop.
Deployment
KStreamBuilder builder = new KStreamBuilder();
KStream<String, Long> kv = builder.stream(Serdes.String(), Serdes.Long(), "topic");
KGroupedStream<String, Long> group = kv.groupBy((k,v) -> k, Serdes.String(), Serdes.Long());
group.reduce((a,b) -> a, "kv");
group.count(SessionWindows.with(60 * 1000), "session");
group.count(TimeWindows.of(10000L), "window");
Properties streamProps = new Properties();
streamProps.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString());
streamProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_HOST + ":" + KAFKA_PORT);
streamProps.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamProps.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamProps.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
RestKiqrServerVerticle.Builder verticleBuilder = RestKiqrServerVerticle.Builder.serverBuilder(builder, streamProps).withPort(4711);
vertx.deploy(verticleBuilder.build());
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment