Skip to content

Instantly share code, notes, and snippets.

@msvitok77
Last active November 2, 2016 11:28
Show Gist options
  • Save msvitok77/651107977511a9da95ac5cb7ce7de49a to your computer and use it in GitHub Desktop.
Save msvitok77/651107977511a9da95ac5cb7ce7de49a to your computer and use it in GitHub Desktop.
KTable - KTable problem
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import static org.apache.kafka.streams.StreamsConfig.*;
public class Main {
private static Properties producer() {
Properties producerCfg = new Properties();
producerCfg.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerCfg.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerCfg.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
return producerCfg;
}
private static void starConsuming(KStreamBuilder builder) throws InterruptedException {
Properties streamsCfg = new Properties();
streamsCfg.put(APPLICATION_ID_CONFIG, "JoinProblemTest");
streamsCfg.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsCfg.put(ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
streamsCfg.put(CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsCfg.put(KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsCfg.put(VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
KafkaStreams streams = new KafkaStreams(builder, streamsCfg);
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
streams.start();
produceMessages();
CountDownLatch neverEnd = new CountDownLatch(1);
neverEnd.await();
}
private static void produceMessages() {
BooleanSerde booleanSerde = new BooleanSerde();
KafkaProducer<String, Boolean> producerBool = new KafkaProducer<>(producer(), Serdes.String().serializer(), booleanSerde);
KafkaProducer<String, Integer> producerInt = new KafkaProducer<>(producer(), Serdes.String().serializer(), Serdes.Integer().serializer());
System.out.println("Starting producing messages...");
// dis-arm
producerBool.send(new ProducerRecord<>("JoinArmTopic", 0, "hgw1", Boolean.FALSE));
for (int i = 0; i < 5; i++) {
producerInt.send(new ProducerRecord<>("JoinPersonTopic", 0, "hgw1", 1));
}
for (int i = 0; i < 5; i++) {
producerInt.send(new ProducerRecord<>("JoinPersonTopic", 0, "hgw1", -1));
}
producerBool.flush();
producerInt.flush();
System.out.println("Finished producing messages...");
}
public static void main(String[] args) throws InterruptedException {
BooleanSerde booleanSerde = new BooleanSerde();
KStreamBuilder builder = new KStreamBuilder();
KTable<String, Boolean> joinArmStore = builder.table(
Serdes.String(),
Serdes.serdeFrom(booleanSerde, booleanSerde),
"JoinArmTopic",
"JoinArmStore"
);
KStream<String, Integer> joinPersonStream = builder.stream(
Serdes.String(),
Serdes.Integer(),
"JoinPersonTopic"
);
KTable<String, Integer> joinPersonStore = joinPersonStream
.groupByKey(Serdes.String(), Serdes.Integer())
.reduce((v1, v2) -> v1 + v2, "JoinPersonStore");
/*joinPersonStore
.filter((k, v) -> v == 0)
.join(joinArmStore, Status::new)
.filter((k, v) -> (v != null) && (!v.armed))
.print();*/
// above code won't work due to tombstone <key:null> messages and that KTable - KTable join ignores filter operators
joinPersonStore
.toStream()
.filter((k, v) -> v == 0)
.leftJoin(joinArmStore, Status::new)
.filter((k, v) -> (v != null) && (!v.armed))
.print();
starConsuming(builder);
}
private static final class Status {
final int count;
final boolean armed;
Status(int count, boolean armed) {
this.count = count;
this.armed = armed;
}
@Override
public String toString() {
return "Status{" +
"count=" + count +
", armed=" + armed +
'}';
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment