Last active
June 12, 2018 04:15
-
-
Save CattenLinger/67016a31fd8672c52e3f45dbfa2e94e4 to your computer and use it in GitHub Desktop.
Kafka Demo
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
docker run -d\ | |
--name zookeeper\ | |
--restart always\ | |
-p 2181:2181\ | |
-p 2888:2888\ | |
-p 3888:3888\ | |
zookeeper &&\ | |
docker run -d\ | |
--name kafka\ | |
-e KAFKA_ADVERTISED_HOST_NAME=192.168.1.99\ | |
-e KAFKA_ZOOKEEPER_CONNECT=192.168.1.99:2181\ | |
-e KAFKA_BROKER_ID=0\ | |
-p 9092:9092\ | |
wurstmeister/kafka | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
bootstrap.servers=192.168.1.99:9092 | |
group.id=demo-0 | |
enable.auto.commit=true | |
auto.commit.interval.ms=10000 | |
session.timeout.ms=30000 | |
auto.offset.reset=earliest | |
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer | |
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
bootstrap.servers=192.168.1.99:9092 | |
key.serializer=org.apache.kafka.common.serialization.StringSerializer | |
value.serializer=org.apache.kafka.common.serialization.StringSerializer |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package net.catten.kafka.demo; | |
import org.apache.kafka.clients.consumer.Consumer; | |
import org.apache.kafka.clients.consumer.ConsumerRecords; | |
import org.apache.kafka.clients.consumer.KafkaConsumer; | |
import org.apache.kafka.clients.producer.KafkaProducer; | |
import org.apache.kafka.clients.producer.Producer; | |
import org.apache.kafka.clients.producer.ProducerRecord; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.io.IOException; | |
import java.text.SimpleDateFormat; | |
import java.util.*; | |
public class KafkaDemo { | |
private final static String demoTopic = "demo-topic"; | |
private final static Logger logger = LoggerFactory.getLogger(Main.class); | |
public static void main(String[] args) throws IOException { | |
Thread thread1 = new Thread(new DemoProducer(demoTopic)); | |
thread1.setDaemon(false); | |
Thread thread2 = new Thread(new DemoConsumer(demoTopic)); | |
thread2.setDaemon(false); | |
thread1.start(); | |
thread2.start(); | |
logger.info("Demo started, kill it to stop."); | |
} | |
} | |
// A producer send random number | |
class DemoProducer implements Runnable { | |
private final Logger logger = LoggerFactory.getLogger(this.getClass()); | |
private final Producer<String, String> producer; | |
private final Random random; | |
private final String topic; | |
public DemoProducer(String topic) throws IOException { | |
this.topic = topic; | |
// Initialize the producer | |
Properties properties = new Properties(); | |
properties.load(DemoProducer.class.getResourceAsStream("/kafka-producer.properties")); | |
producer = new KafkaProducer<>(properties); | |
// Initialize the random | |
random = new Random(System.currentTimeMillis()); | |
} | |
@Override | |
public void run() { | |
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); | |
try { | |
while (true) { | |
String content = String.format( | |
"{'currentTime':'%s', 'randomNumber':%d}", | |
simpleDateFormat.format(new Date()), | |
random.nextInt()); | |
producer.send(new ProducerRecord<>(topic, content)); | |
Thread.sleep(300); | |
} | |
} catch (Exception e) { | |
if(!(e instanceof InterruptedException)) logger.error("DemoProducer exited with exception.",e); | |
} finally { | |
logger.info("DemoProducer exited."); | |
} | |
} | |
} | |
class DemoConsumer implements Runnable { | |
private final Logger logger = LoggerFactory.getLogger(this.getClass()); | |
private final Consumer<String, String> consumer; | |
public DemoConsumer(String topic) throws IOException { | |
// Initialize the consumer | |
Properties properties = new Properties(); | |
properties.load(DemoConsumer.class.getResourceAsStream("/kafka-consumer.properties")); | |
consumer = new KafkaConsumer<>(properties); | |
consumer.subscribe(Collections.singletonList(topic)); | |
} | |
@Override | |
public void run() { | |
try { | |
while (true) { | |
ConsumerRecords<String, String> record = consumer.poll(500); | |
if (!record.isEmpty()) { | |
record.iterator().forEachRemaining(i -> logger.info(String.format("[%s]:%s", i.key(), i.value()))); | |
} | |
} | |
} catch (Exception e) { | |
logger.error("DemoProducer exited with exception.",e); | |
} finally { | |
logger.info("DemoConsumer exited."); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment