Skip to content

Instantly share code, notes, and snippets.

@CattenLinger
Last active June 12, 2018 04:15
Show Gist options
  • Save CattenLinger/67016a31fd8672c52e3f45dbfa2e94e4 to your computer and use it in GitHub Desktop.
Save CattenLinger/67016a31fd8672c52e3f45dbfa2e94e4 to your computer and use it in GitHub Desktop.
Kafka Demo
#!/bin/bash
docker run -d\
--name zookeeper\
--restart always\
-p 2181:2181\
-p 2888:2888\
-p 3888:3888\
zookeeper &&\
docker run -d\
--name kafka\
-e KAFKA_ADVERTISED_HOST_NAME=192.168.1.99\
-e KAFKA_ZOOKEEPER_CONNECT=192.168.1.99:2181\
-e KAFKA_BROKER_ID=0\
-p 9092:9092\
wurstmeister/kafka
bootstrap.servers=192.168.1.99:9092
group.id=demo-0
enable.auto.commit=true
auto.commit.interval.ms=10000
session.timeout.ms=30000
auto.offset.reset=earliest
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
bootstrap.servers=192.168.1.99:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
package net.catten.kafka.demo;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.*;
public class KafkaDemo {
private final static String demoTopic = "demo-topic";
private final static Logger logger = LoggerFactory.getLogger(Main.class);
public static void main(String[] args) throws IOException {
Thread thread1 = new Thread(new DemoProducer(demoTopic));
thread1.setDaemon(false);
Thread thread2 = new Thread(new DemoConsumer(demoTopic));
thread2.setDaemon(false);
thread1.start();
thread2.start();
logger.info("Demo started, kill it to stop.");
}
}
// A producer send random number
class DemoProducer implements Runnable {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Producer<String, String> producer;
private final Random random;
private final String topic;
public DemoProducer(String topic) throws IOException {
this.topic = topic;
// Initialize the producer
Properties properties = new Properties();
properties.load(DemoProducer.class.getResourceAsStream("/kafka-producer.properties"));
producer = new KafkaProducer<>(properties);
// Initialize the random
random = new Random(System.currentTimeMillis());
}
@Override
public void run() {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
while (true) {
String content = String.format(
"{'currentTime':'%s', 'randomNumber':%d}",
simpleDateFormat.format(new Date()),
random.nextInt());
producer.send(new ProducerRecord<>(topic, content));
Thread.sleep(300);
}
} catch (Exception e) {
if(!(e instanceof InterruptedException)) logger.error("DemoProducer exited with exception.",e);
} finally {
logger.info("DemoProducer exited.");
}
}
}
class DemoConsumer implements Runnable {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Consumer<String, String> consumer;
public DemoConsumer(String topic) throws IOException {
// Initialize the consumer
Properties properties = new Properties();
properties.load(DemoConsumer.class.getResourceAsStream("/kafka-consumer.properties"));
consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(topic));
}
@Override
public void run() {
try {
while (true) {
ConsumerRecords<String, String> record = consumer.poll(500);
if (!record.isEmpty()) {
record.iterator().forEachRemaining(i -> logger.info(String.format("[%s]:%s", i.key(), i.value())));
}
}
} catch (Exception e) {
logger.error("DemoProducer exited with exception.",e);
} finally {
logger.info("DemoConsumer exited.");
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment