Skip to content

Instantly share code, notes, and snippets.

ExpectedTopicConfiguration expected = new ExpectedTopicConfiguration.ExpectedTopicConfigurationBuilder("test_topic")
.withReplicationFactor(2).build();
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String,String> producer = KafkaProducerFactory.producer(props, Collections.singleton(expected));
- name: test_topic
replication-factor: 1
partition-count: 1
config:
- cleanup.policy: delete
- delete.retention.ms: 86400000
- name: test_topic2
replication-factor: 1
config:
ExpectedTopicConfiguration testTopic = new ExpectedTopicConfiguration.ExpectedTopicConfigurationBuilder("test_topic")
.withReplicationFactor(3)
.withPartitionCount(32)
.withConfig("delete.retention.ms", "30000")
.withConfig("cleanup.policy", "delete")
.build();
DescribeConfigsResult configs = adminClient.describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, "topic_name")));
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
AdminClient adminClient = AdminClient.create(props);
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singleton(“topic-name”));
producer.send(new ProducerRecord<>(keyGen.getRandomString(), “some value”));
producer.send(new ProducerRecord<>("paramter", “some value”));
@ftrossbach
ftrossbach / SingleWebSocketRequest.scala
Created August 18, 2016 21:17
Meetup Druid example
package de.ftrossbach.meetup
import java.util.concurrent.TimeUnit
import akka.actor.ActorSystem
import akka.{Done, NotUsed}
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.model._
@ftrossbach
ftrossbach / SpecificClient.java
Last active March 17, 2017 13:23
Specific Client
//constructing a client for a store called "kv" with String keys and long values
SpecificBlockingKiqrClient<String, Long> client = new SpecificBlockingRestKiqrClientImpl<>("localhost", 44321, "kv", String.class, Long.class, Serdes.String(), Serdes.Long());
//querying key "key1" from key-value store "kv" with String keys and Long values
Optional<Long> result = client.getScalarKeyValue("key1");
//querying count of entries from key-value store "kv"
Optional<Long> result = client.count("kv");
//querying all keys from store "kv" with String keys and Long values
Map<String, Long> result = client.getAllKeyValues();
//querying key range "key1" to "key3" from store "kv" with String keys and Long values
Map<String, Long> result = client.getRangeKeyValues("key1", "key3");
KStreamBuilder builder = new KStreamBuilder();
KStream<String, Long> kv = builder.stream(Serdes.String(), Serdes.Long(), "topic");
KGroupedStream<String, Long> group = kv.groupBy((k,v) -> k, Serdes.String(), Serdes.Long());
group.reduce((a,b) -> a, "kv");
group.count(SessionWindows.with(60 * 1000), "session");
group.count(TimeWindows.of(10000L), "window");
Properties streamProps = new Properties();
streamProps.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString());