Skip to content

Instantly share code, notes, and snippets.

View ivanursul's full-sized avatar

Ivan Ursul ivanursul

View GitHub Profile
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning \
--topic WordsWithCountsTopic2 \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
null Ivan Baraban
package org.kafka.examples;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.Properties;
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning \
--topic WordsWithCountsTopic \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
all 1
streams 1
lead 1
to 1
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>kafka-streams</groupId>
<artifactId>kafka-streams-hello-world</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
package org.kafka.examples;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
/Users/ivanursul/development/java/jdk1.8.0_66.jdk/Contents/Home/bin/java -Didea.launcher.port=7537 "-Didea.launcher.bin.path=/Applications/IntelliJ IDEA CE.app/Contents/bin" -Dfile.encoding=UTF-8 -classpath "/Users/ivanursul/development/java/jdk1.8.0_66.jdk/Contents/Home/jre/lib/charsets.jar:/Users/ivanursul/development/java/jdk1.8.0_66.jdk/Contents/Home/jre/lib/deploy.jar:/Users/ivanursul/development/java/jdk1.8.0_66.jdk/Contents/Home/jre/lib/ext/cldrdata.jar:/Users/ivanursul/development/java/jdk1.8.0_66.jdk/Contents/Home/jre/lib/ext/dnsns.jar:/Users/ivanursul/development/java/jdk1.8.0_66.jdk/Contents/Home/jre/lib/ext/jaccess.jar:/Users/ivanursul/development/java/jdk1.8.0_66.jdk/Contents/Home/jre/lib/ext/jfxrt.jar:/Users/ivanursul/development/java/jdk1.8.0_66.jdk/Contents/Home/jre/lib/ext/localedata.jar:/Users/ivanursul/development/java/jdk1.8.0_66.jdk/Contents/Home/jre/lib/ext/nashorn.jar:/Users/ivanursul/development/java/jdk1.8.0_66.jdk/Contents/Home/jre/lib/ext/sunec.jar:/Users/ivanursul/development/java/
import java.util.*;
public class QueueModel {
public static void main(String[] args) {
Map<String, Queue<Message>> queueMap = createQueueMap();
QueueMessageProducer producer = new QueueMessageProducer(queueMap);
initConsumers(queueMap);
import java.util.*;
public class QueueModel {
public static void main(String[] args) {
Queue<Message> queue = new LinkedList<>();
QueueMessageProducer producer = new QueueMessageProducer(queue);
List<QueueConsumer> consumers = new ArrayList<>();
import java.util.*;
public class QueueModel {
public static void main(String[] args) {
Queue<String> queue = new LinkedList<>();
QueueMessageProducer producer = new QueueMessageProducer(queue);
List<QueueConsumer> consumers = new ArrayList<>();
import java.util.*;
import java.util.Queue;
public class PubSubModel {
public static void main(String[] args) {
List<Consumer> consumers = new ArrayList<>();
for (int i = 0; i < 3; i++ ) {
Consumer consumer = new Consumer(