Skip to content

Instantly share code, notes, and snippets.

ExpectedTopicConfiguration expected = new ExpectedTopicConfiguration.ExpectedTopicConfigurationBuilder("test_topic")
.withReplicationFactor(2).build();
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String,String> producer = KafkaProducerFactory.producer(props, Collections.singleton(expected));
- name: test_topic
replication-factor: 1
partition-count: 1
config:
- cleanup.policy: delete
- delete.retention.ms: 86400000
- name: test_topic2
replication-factor: 1
config:
ExpectedTopicConfiguration testTopic = new ExpectedTopicConfiguration.ExpectedTopicConfigurationBuilder("test_topic")
.withReplicationFactor(3)
.withPartitionCount(32)
.withConfig("delete.retention.ms", "30000")
.withConfig("cleanup.policy", "delete")
.build();
DescribeConfigsResult configs = adminClient.describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, "topic_name")));
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
AdminClient adminClient = AdminClient.create(props);
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singleton(“topic-name”));
producer.send(new ProducerRecord<>(keyGen.getRandomString(), “some value”));
producer.send(new ProducerRecord<>("paramter", “some value”));
@ftrossbach
ftrossbach / SpecificClient.java
Last active March 17, 2017 13:23
Specific Client
//constructing a client for a store called "kv" with String keys and long values
SpecificBlockingKiqrClient<String, Long> client = new SpecificBlockingRestKiqrClientImpl<>("localhost", 44321, "kv", String.class, Long.class, Serdes.String(), Serdes.Long());
//querying key "key1" from key-value store "kv" with String keys and Long values
Optional<Long> result = client.getScalarKeyValue("key1");
//querying count of entries from key-value store "kv"
Optional<Long> result = client.count("kv");
//querying all keys from store "kv" with String keys and Long values
Map<String, Long> result = client.getAllKeyValues();
//querying key range "key1" to "key3" from store "kv" with String keys and Long values
Map<String, Long> result = client.getRangeKeyValues("key1", "key3");
KStreamBuilder builder = new KStreamBuilder();
KStream<String, Long> kv = builder.stream(Serdes.String(), Serdes.Long(), "topic");
KGroupedStream<String, Long> group = kv.groupBy((k,v) -> k, Serdes.String(), Serdes.Long());
group.reduce((a,b) -> a, "kv");
group.count(SessionWindows.with(60 * 1000), "session");
group.count(TimeWindows.of(10000L), "window");
Properties streamProps = new Properties();
streamProps.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString());
@ftrossbach
ftrossbach / Vertx.java
Created March 17, 2017 13:04
VertxCluster
Vertx.clusteredVertx(new VertxOptions().setClusterManager(new HazelcastClusterManager()), handler -> {
if(handler.succeeded()){
Vertx vertx = handler.result();
});