Skip to content

Instantly share code, notes, and snippets.

@LearningJournal
Last active July 21, 2022 14:35
Show Gist options
  • Save LearningJournal/c246cfbda6661bbc567dd6cb38403b78 to your computer and use it in GitHub Desktop.
Save LearningJournal/c246cfbda6661bbc567dd6cb38403b78 to your computer and use it in GitHub Desktop.
sudo yum -y install java-1.8.0-openjdk
sudo yum -y install wget
wget http://redrockdigimark.com/apachemirror/kafka/2.0.0/kafka_2.12-2.0.0.tgz
tar -xzf kafka_2.12-2.0.0.tgz
PATH=$PATH:$HOME/.local/bin:$HOME/bin:$HOME/kafka_2.12-2.0.0/bin
vi $HOME/kafka_2.12-2.0.0/config/zookeeper.properties.
dataDir=/home/prashant/zookeeper_data
mkdir $HOME/zookeeper_data
$HOME/kafka_2.12-2.0.0/bin/zookeeper-server-start.sh /home/prashant/kafka_2.12-2.0.0/config/zookeeper.properties
/home/prashant/kafka_2.12-2.0.0/bin/zookeeper-server-start.sh /home/prashant/kafka_2.12-2.0.0/config/zookeeper.properties> /dev/null 2>&1 &
sudo chmod +x /etc/rc.d/rc.local
sudo systemctl enable rc-local
sudo systemctl start rc-local
kafka_2.12-2.0.0/bin/zookeeper-shell.sh 10.160.0.5:2181 ls /brokers/ids
mkdir /home/prashant/kafka_data
/home/prashant/kafka_2.12-2.0.0/bin/kafka-server-start.sh /home/prashant/kafka_2.12-2.0.0/config/server.properties> /dev/null 2>&1 &
sudo chmod +x /etc/rc.d/rc.local
sudo systemctl enable rc-local
sudo systemctl start rc-local
kafka_2.12-2.0.0/bin/zookeeper-shell.sh 10.160.0.5:2181 ls /brokers/ids
kafka-topics.sh --create --zookeeper 10.160.0.5:2181 --replication-factor 3 --partitions 3 --topic test
kafka-topics.sh --list --zookeeper 10.160.0.5:2181
kafka-console-producer.sh --broker-list 10.160.0.2:9092 --topic test
kafka-console-consumer.sh --bootstrap-server 10.160.0.2:9092 --topic test --from-beginning
C:\Program Files\apache-maven-3.6.0\bin
echo %JAVA_HOME%
mvn -version
C:\Program Files\Java\jdk1.8.0_191
<build>
<plugins>
<!-- Maven Compiler Plugin-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<!-- Apache Kafka Clients-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
<!-- Apache Kafka Streams-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.0.0</version>
</dependency>
<!-- Apache Log4J2 binding for SLF4J -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.11.0</version>
</dependency>
<!-- JUnit5 Jupiter -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.3.1</version>
<scope>test</scope>
</dependency>
<!-- JUnit 5 Jupiter Engine -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.3.1</version>
<scope>test</scope>
</dependency>
<!-- JUnit 5 Jupiter Parameterized Testing -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<version>5.3.1</version>
<scope>test</scope>
</dependency>
</dependencies>
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="ERROR">
<Appenders>
<Console name="stdout" target="SYSTEM_OUT">
<PatternLayout pattern="[%d] (%c) - %p %m %n"/>
</Console>
</Appenders>
<Loggers>
<Root level="error">
<AppenderRef ref="stdout"/>
</Root>
<Logger name="org.apache.kafka.clients" level="warn" additivity="false">
<AppenderRef ref="stdout"/>
</Logger>
<Logger name="guru.learningjournal.kafka.examples" level="trace" additivity="false">
<AppenderRef ref="stdout"/>
</Logger>
</Loggers>
</Configuration>
guru.learningjournal.kafka.examples.HelloProducer
package guru.learningjournal.kafka.examples;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Properties;
public class HelloProducer {
private static final Logger logger = LogManager.getLogger(HelloProducer.class);
public static void main(String[] args) {
String topicName;
int numEvents;
if (args.length != 2) {
System.out.println("Please provide command line arguments: topicName numEvents");
System.exit(-1);
}
topicName = args[0];
numEvents = Integer.valueOf(args[1]);
logger.info("Starting HelloProducer...");
logger.debug("topicName=" + topicName + ", numEvents=" + numEvents);
logger.trace("Creating Kafka Producer...");
Properties props = new Properties();
props.put(ProducerConfig.CLIENT_ID_CONFIG, "HelloProducer");
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<Integer, String> producer = new KafkaProducer<>(props);
logger.trace("Start sending messages...");
try {
for (int i = 1; i <= numEvents; i++) {
producer.send(new ProducerRecord<>(topicName, i, "Simple Message-" + i));
}
} catch (KafkaException e) {
logger.error("Exception occurred – Check log for more details.\n" + e.getMessage());
System.exit(-1);
} finally {
logger.info("Finished HelloProducer – Closing Kafka Producer.");
producer.close();
}
}
}
zookeeper-server-start.bat C:\Users\prashant\Downloads\kafka_2.12-2.0.0\config\zookeeper.properties
kafka-server-start.bat C:\Users\prashant\Downloads\kafka_2.12-2.0.0\config\server.properties
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
java -version
java version "1.8.0_191"
Java(TM) SE Runtime Environment (build 1.8.0_191-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.191-b12, mixed mode)
data Dir = C:\zookeeper_data
log.dirs=C:\kafka_logs
offsets.topic.num.partitions = 1
offsets.topic.replication.factor = 1
min.insync.replicas=1
default.replication.factor = 1
kafka_2.12-2.0.0\bin\windows\zookeeper-server-start.bat kafka_2.12-2.0.0\config\zookeeper.properties
kafka_2.12-2.0.0\bin\windows\kafka-server-start.bat kafka_2.12-2.0.0\config\server.properties
kafka_2.12-2.0.0\bin\windows\zookeeper-shell.bat localhost:2181 ls /brokers/ids
kafka_2.12-2.0.0\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
kafka_2.12-2.0.0\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
kafka_2.12-2.0.0\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
kafka_2.12-2.0.0\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "HelloStreams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.Integer().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<Integer, String> kStream = builder.stream(topicName);
kStream.foreach((k, v) -> System.out.println("Key = " + k + " Value = " + v));
//kStream.peek((k, v) -> System.out.println("Key = " + k + " Value = " + v));
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
logger.info("Starting the stream");
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("Stopping Stream");
streams.close();
}));
}
StreamsBuilder builder = new StreamsBuilder();
builder.stream(topicName)
.foreach((k, v) -> System.out.println("Key = " + k + " Value = " + v));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
public class OddEvenPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
if ((keyBytes == null) || (!(key instanceof Integer)))
throw new InvalidRecordException("Topic Key must have a valid Integer value.");
if (cluster.partitionsForTopic(topic).size() != 2)
throw new InvalidTopicException("Topic must have exactly two partitions");
return (Integer) key % 2;
}
@Override
public void close() {
//Nothing to close
}
@Override
public void configure(Map<String, ?> map) {
//Nothing to configure
}
}
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, OddEvenPartitioner.class.getName());
for (int j = 1; j <= numEvents; j++) {
int i = j;
producer.send(new ProducerRecord < > (topicName, i, "Simple Message-" + i),
(recordMetadata, e) -> {
if (e != null)
logger.error("Error sending message with key " + i + " Error - " + e.getMessage());
else
logger.info("Message " + i + " persisted with offset " + recordMetadata.offset() +
" and timestamp on " + new Timestamp(recordMetadata.timestamp()));
});
}
public class JsonSerializer implements Serializer<JsonNode> {
private final ObjectMapper objectMapper = new ObjectMapper();
public JsonSerializer() {
//Nothing to do
}
@Override
public void configure(Map<String, ?> config, boolean isKey) {
//Nothing to Configure
}
@Override
public byte[] serialize(String topic, JsonNode data) {
if (data == null) {
return null;
}
try {
return objectMapper.writeValueAsBytes(data);
} catch (JsonProcessingException e) {
throw new SerializationException("Error serializing JSON message", e);
}
}
@Override
public void close() {
}
}
private static List < StockData > getStocks(String dataFile) throws IOException {
File file = new File(dataFile);
MappingIterator < StockData > stockDataIterator =
new CsvMapper().readerWithTypedSchemaFor(StockData.class).readValues(file);
return stockDataIterator.readAll();
}
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class.getName());
//Some code ommited for clarity
for (int i = 0; i < eventFiles.length; i++) {
for (StockData s: getStocks(eventFiles[i])) {
stockArrayOfList[i].add(objectMapper.valueToTree(s));
}
dispatchers.add(new Thread(new Dispatcher(producer, topicName,
eventFiles[i], stockArrayOfList[i]), eventFiles[i]));
dispatchers.get(i).start();
}
public class Dispatcher implements Runnable {
private final KafkaProducer<Integer, String> producer;
private final String topicName;
private final String fileLocation;
private static final Logger logger = LogManager.getLogger(Dispatcher.class);
Dispatcher(KafkaProducer<Integer, String> producer, String topicName, String fileLocation) {
this.producer = producer;
this.topicName = topicName;
this.fileLocation = fileLocation;
}
@Override
public void run() {
logger.info("Start processing " + fileLocation + "...");
File file = new File(fileLocation);
int msgKey = 0;
try (Scanner scanner = new Scanner(file)) {
while (scanner.hasNextLine()) {
String line = scanner.nextLine();
producer.send(new ProducerRecord<>(topicName, msgKey, line));
msgKey++;
}
logger.trace("Finished sending " + msgKey + " messages from " + fileLocation);
} catch (Exception e) {
logger.error("Exception in thread " + fileLocation);
throw new RuntimeException(e);
}
}
}
public static void main(String[] args) {
if (args.length < 2) {
System.out.println("Please provide command line arguments: topicName EventFiles");
System.exit(-1);
}
String topicName = args[0];
String[] eventFiles = Arrays.copyOfRange(args, 1, args.length);
Properties properties = new Properties();
try {
InputStream configStream = ClassLoader.class.getResourceAsStream(kafkaConfig);
properties.load(configStream);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
} catch (IOException e) {
logger.error("Cannot open Kafka config " + kafkaConfig);
throw new RuntimeException(e);
}
logger.trace("Starting dispatcher threads...");
KafkaProducer<Integer, String> producer = new KafkaProducer<>(properties);
Thread[] dispatchers = new Thread[eventFiles.length];
for (int i = 0; i < eventFiles.length; i++) {
dispatchers[i] = new Thread(new Dispatcher(producer, topicName, eventFiles[i]));
dispatchers[i].start();
}
try {
for (Thread t : dispatchers)
t.join();
} catch (InterruptedException e) {
logger.error("Thread Interrupted ");
} finally {
producer.close();
logger.info("Finished dispatcher demo - Closing Kafka Producer.");
}
}
public class AvroConsumer {
private static final String kafkaConfig = "/kafka.properties";
@SuppressWarnings("InfiniteLoopStatement")
public static void main(String[] args) {
if (args.length < 2) {
System.out.println("Please provide command line arguments: topicName groupName");
System.exit(-1);
}
String topicName = args[0];
String groupName = args[1];
Properties properties = new Properties();
try {
InputStream kafkaConfigStream =
ClassLoader.class.getResourceAsStream(kafkaConfig);
properties.load(kafkaConfigStream);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupName);
//Set autocommit to false so you can execute it again for the same set of messages
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
KafkaAvroDeserializer.class);
properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
properties.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
"http://localhost:8081");
} catch (IOException e) {
logger.error(e.getMessage());
throw new RuntimeException(e);
}
final KafkaConsumer<String, StockData> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(topicName));
while (true) {
ConsumerRecords<String, StockData> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, StockData> record : records) {
System.out.println(record.value());
}
}
}
}
{"namespace": "guru.learningjournal.kafka.examples",
"type": "record",
"name": "StockData",
"fields": [
{"name": "symbol", "type": ["null", "string"]},
{"name": "series", "type":["null", "string"]},
{"name": "open", "type": ["null", "float"]},
{"name": "high", "type": ["null", "float"]},
{"name": "low", "type": ["null", "float"]},
{"name": "close", "type": ["null", "float"]},
{"name": "last", "type": ["null", "float"]},
{"name": "previousClose", "type": ["null", "float"]},
{"name": "totalTradedQty", "type": ["null", "long"]},
{"name": "totalTradedVal", "type": ["null", "double"]},
{"name": "tradeDate", "type": ["null", "string"]},
{"name": "totalTrades", "type": ["null", "int"]},
{"name": "isinCode", "type": ["null", "string"]}
]
}
private static List<StockData> getStocks(String dataFile) throws IOException {
File file = new File(dataFile);
CsvSchema schema = CsvSchema.builder()
.addColumn("symbol", CsvSchema.ColumnType.STRING)
.addColumn("series", CsvSchema.ColumnType.STRING)
.addColumn("open", CsvSchema.ColumnType.NUMBER)
.addColumn("high", CsvSchema.ColumnType.NUMBER)
.addColumn("low", CsvSchema.ColumnType.NUMBER)
.addColumn("close", CsvSchema.ColumnType.NUMBER)
.addColumn("last", CsvSchema.ColumnType.NUMBER)
.addColumn("previousClose", CsvSchema.ColumnType.NUMBER)
.addColumn("totalTradedQty", CsvSchema.ColumnType.NUMBER)
.addColumn("totalTradedVal", CsvSchema.ColumnType.NUMBER)
.addColumn("tradeDate", CsvSchema.ColumnType.STRING)
.addColumn("totalTrades", CsvSchema.ColumnType.NUMBER)
.addColumn("isinCode", CsvSchema.ColumnType.STRING)
.build();
MappingIterator<StockData> stockDataIterator = new
CsvMapper().readerFor(StockData.class).with(schema).readValues(file);
return stockDataIterator.readAll();
}
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
properties.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
for (int i = 0; i < eventFiles.length; i++) {
logger.info("Preparing data for " + eventFiles[i]);
for (StockData s : getStocks(eventFiles[i])) {
stockArrayOfList[i].add(s);
}
dispatchers.add(new Thread(new Dispatcher(producer, topicName, eventFiles[i],
stockArrayOfList[i]), eventFiles[i]));
dispatchers.get(i).start();
}
public class HelloProducer {
private static final Logger logger = LogManager.getLogger(HelloProducer.class);
public static void main(String[] args) {
logger.info("Creating Kafka Producer...");
Properties props = new Properties();
props.put(ProducerConfig.CLIENT_ID_CONFIG, AppConfigs.applicationID);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
AppConfigs.bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
KafkaProducer<Integer, String> producer = new KafkaProducer<>(props);
logger.info("Start sending messages...");
for (int i = 1; i <= AppConfigs.numEvents; i++) {
producer.send(new ProducerRecord<>(AppConfigs.topicName, i, "Simple Message-" + i));
}
logger.info("Finished - Closing Kafka Producer.");
producer.close();
}
}
RecordMetadata metadata;
for (int i = 1; i <= numEvents; i++) {
metadata = producer.send(new ProducerRecord<>(topicName, i, "Simple Message-" + i)).get();
logger.info("Message " + i + " persisted with offset " + metadata.offset()
+ " and timestamp on " + new Timestamp(metadata.timestamp()));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment