Skip to content

Instantly share code, notes, and snippets.

@ftrossbach
ftrossbach / SingleWebSocketRequest.scala
Created August 18, 2016 21:17
Meetup Druid example
package de.ftrossbach.meetup
import java.util.concurrent.TimeUnit
import akka.actor.ActorSystem
import akka.{Done, NotUsed}
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.model._
@ftrossbach
ftrossbach / Topology.java
Last active March 10, 2017 16:03
A simplified Kafka Streams topology
KStreamBuilder builder = new KStreamBuilder();
KStream<String, Long> visitsStream = builder.stream(Serdes.String(), Serdes.Long(), "visitsTopic");
KGroupedStream<String, Long> groupedStream = visitsStream.groupByKey();
KTable<String, Long> totalCount = groupedStream.count("totalVisitCount");
KTable<Windowed<String>, Long> windowedCount = groupedStream.count(TimeWindows.of(60 * 60 * 1000), "hourlyVisitCount");
groupedStream.count(SessionWindows.with(60 * 1000), "sessionVisitCount");
{
"hostInfo":{
"host":"1.2.3.4",
"port":42
},
"stateStoreNames":[
"totalVisitCount",
"hourlyVisitCount",
"sessionVisitCount"
],
@ftrossbach
ftrossbach / Vertx.java
Created March 17, 2017 13:04
VertxCluster
Vertx.clusteredVertx(new VertxOptions().setClusterManager(new HazelcastClusterManager()), handler -> {
if(handler.succeeded()){
Vertx vertx = handler.result();
});
KStreamBuilder builder = new KStreamBuilder();
KStream<String, Long> kv = builder.stream(Serdes.String(), Serdes.Long(), "topic");
KGroupedStream<String, Long> group = kv.groupBy((k,v) -> k, Serdes.String(), Serdes.Long());
group.reduce((a,b) -> a, "kv");
group.count(SessionWindows.with(60 * 1000), "session");
group.count(TimeWindows.of(10000L), "window");
Properties streamProps = new Properties();
streamProps.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString());
@ftrossbach
ftrossbach / SpecificClient.java
Last active March 17, 2017 13:23
Specific Client
//constructing a client for a store called "kv" with String keys and long values
SpecificBlockingKiqrClient<String, Long> client = new SpecificBlockingRestKiqrClientImpl<>("localhost", 44321, "kv", String.class, Long.class, Serdes.String(), Serdes.Long());
//querying key "key1" from key-value store "kv" with String keys and Long values
Optional<Long> result = client.getScalarKeyValue("key1");
//querying count of entries from key-value store "kv"
Optional<Long> result = client.count("kv");
//querying all keys from store "kv" with String keys and Long values
Map<String, Long> result = client.getAllKeyValues();
//querying key range "key1" to "key3" from store "kv" with String keys and Long values
Map<String, Long> result = client.getRangeKeyValues("key1", "key3");
producer.send(new ProducerRecord<>("paramter", “some value”));
producer.send(new ProducerRecord<>(keyGen.getRandomString(), “some value”));
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
AdminClient adminClient = AdminClient.create(props);
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singleton(“topic-name”));
DescribeConfigsResult configs = adminClient.describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, "topic_name")));