Last active
July 21, 2022 14:35
-
-
Save LearningJournal/c246cfbda6661bbc567dd6cb38403b78 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sudo yum -y install java-1.8.0-openjdk |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sudo yum -y install wget |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
wget http://redrockdigimark.com/apachemirror/kafka/2.0.0/kafka_2.12-2.0.0.tgz |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
tar -xzf kafka_2.12-2.0.0.tgz |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
PATH=$PATH:$HOME/.local/bin:$HOME/bin:$HOME/kafka_2.12-2.0.0/bin |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
vi $HOME/kafka_2.12-2.0.0/config/zookeeper.properties. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
dataDir=/home/prashant/zookeeper_data |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
mkdir $HOME/zookeeper_data |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$HOME/kafka_2.12-2.0.0/bin/zookeeper-server-start.sh /home/prashant/kafka_2.12-2.0.0/config/zookeeper.properties |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/home/prashant/kafka_2.12-2.0.0/bin/zookeeper-server-start.sh /home/prashant/kafka_2.12-2.0.0/config/zookeeper.properties> /dev/null 2>&1 & |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sudo chmod +x /etc/rc.d/rc.local |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sudo systemctl enable rc-local |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sudo systemctl start rc-local |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
kafka_2.12-2.0.0/bin/zookeeper-shell.sh 10.160.0.5:2181 ls /brokers/ids |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
mkdir /home/prashant/kafka_data |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/home/prashant/kafka_2.12-2.0.0/bin/kafka-server-start.sh /home/prashant/kafka_2.12-2.0.0/config/server.properties> /dev/null 2>&1 & |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sudo chmod +x /etc/rc.d/rc.local |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sudo systemctl enable rc-local |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sudo systemctl start rc-local |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
kafka_2.12-2.0.0/bin/zookeeper-shell.sh 10.160.0.5:2181 ls /brokers/ids |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
kafka-topics.sh --create --zookeeper 10.160.0.5:2181 --replication-factor 3 --partitions 3 --topic test |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
kafka-topics.sh --list --zookeeper 10.160.0.5:2181 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
kafka-console-producer.sh --broker-list 10.160.0.2:9092 --topic test |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
kafka-console-consumer.sh --bootstrap-server 10.160.0.2:9092 --topic test --from-beginning |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
C:\Program Files\apache-maven-3.6.0\bin |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
echo %JAVA_HOME% |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
mvn -version |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
C:\Program Files\Java\jdk1.8.0_191 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<build> | |
<plugins> | |
<!-- Maven Compiler Plugin--> | |
<plugin> | |
<groupId>org.apache.maven.plugins</groupId> | |
<artifactId>maven-compiler-plugin</artifactId> | |
<version>3.8.0</version> | |
<configuration> | |
<source>1.8</source> | |
<target>1.8</target> | |
</configuration> | |
</plugin> | |
</plugins> | |
</build> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<dependencies> | |
<!-- Apache Kafka Clients--> | |
<dependency> | |
<groupId>org.apache.kafka</groupId> | |
<artifactId>kafka-clients</artifactId> | |
<version>2.0.0</version> | |
</dependency> | |
<!-- Apache Kafka Streams--> | |
<dependency> | |
<groupId>org.apache.kafka</groupId> | |
<artifactId>kafka-streams</artifactId> | |
<version>2.0.0</version> | |
</dependency> | |
<!-- Apache Log4J2 binding for SLF4J --> | |
<dependency> | |
<groupId>org.apache.logging.log4j</groupId> | |
<artifactId>log4j-slf4j-impl</artifactId> | |
<version>2.11.0</version> | |
</dependency> | |
<!-- JUnit5 Jupiter --> | |
<dependency> | |
<groupId>org.junit.jupiter</groupId> | |
<artifactId>junit-jupiter-api</artifactId> | |
<version>5.3.1</version> | |
<scope>test</scope> | |
</dependency> | |
<!-- JUnit 5 Jupiter Engine --> | |
<dependency> | |
<groupId>org.junit.jupiter</groupId> | |
<artifactId>junit-jupiter-engine</artifactId> | |
<version>5.3.1</version> | |
<scope>test</scope> | |
</dependency> | |
<!-- JUnit 5 Jupiter Parameterized Testing --> | |
<dependency> | |
<groupId>org.junit.jupiter</groupId> | |
<artifactId>junit-jupiter-params</artifactId> | |
<version>5.3.1</version> | |
<scope>test</scope> | |
</dependency> | |
</dependencies> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<Configuration status="ERROR"> | |
<Appenders> | |
<Console name="stdout" target="SYSTEM_OUT"> | |
<PatternLayout pattern="[%d] (%c) - %p %m %n"/> | |
</Console> | |
</Appenders> | |
<Loggers> | |
<Root level="error"> | |
<AppenderRef ref="stdout"/> | |
</Root> | |
<Logger name="org.apache.kafka.clients" level="warn" additivity="false"> | |
<AppenderRef ref="stdout"/> | |
</Logger> | |
<Logger name="guru.learningjournal.kafka.examples" level="trace" additivity="false"> | |
<AppenderRef ref="stdout"/> | |
</Logger> | |
</Loggers> | |
</Configuration> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
guru.learningjournal.kafka.examples.HelloProducer |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package guru.learningjournal.kafka.examples; | |
import org.apache.kafka.clients.producer.KafkaProducer; | |
import org.apache.kafka.clients.producer.ProducerConfig; | |
import org.apache.kafka.clients.producer.ProducerRecord; | |
import org.apache.kafka.common.KafkaException; | |
import org.apache.kafka.common.serialization.IntegerSerializer; | |
import org.apache.kafka.common.serialization.StringSerializer; | |
import org.apache.logging.log4j.LogManager; | |
import org.apache.logging.log4j.Logger; | |
import java.util.Properties; | |
public class HelloProducer { | |
private static final Logger logger = LogManager.getLogger(HelloProducer.class); | |
public static void main(String[] args) { | |
String topicName; | |
int numEvents; | |
if (args.length != 2) { | |
System.out.println("Please provide command line arguments: topicName numEvents"); | |
System.exit(-1); | |
} | |
topicName = args[0]; | |
numEvents = Integer.valueOf(args[1]); | |
logger.info("Starting HelloProducer..."); | |
logger.debug("topicName=" + topicName + ", numEvents=" + numEvents); | |
logger.trace("Creating Kafka Producer..."); | |
Properties props = new Properties(); | |
props.put(ProducerConfig.CLIENT_ID_CONFIG, "HelloProducer"); | |
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); | |
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); | |
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); | |
KafkaProducer<Integer, String> producer = new KafkaProducer<>(props); | |
logger.trace("Start sending messages..."); | |
try { | |
for (int i = 1; i <= numEvents; i++) { | |
producer.send(new ProducerRecord<>(topicName, i, "Simple Message-" + i)); | |
} | |
} catch (KafkaException e) { | |
logger.error("Exception occurred – Check log for more details.\n" + e.getMessage()); | |
System.exit(-1); | |
} finally { | |
logger.info("Finished HelloProducer – Closing Kafka Producer."); | |
producer.close(); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
zookeeper-server-start.bat C:\Users\prashant\Downloads\kafka_2.12-2.0.0\config\zookeeper.properties |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
kafka-server-start.bat C:\Users\prashant\Downloads\kafka_2.12-2.0.0\config\server.properties |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
java -version |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
java version "1.8.0_191" | |
Java(TM) SE Runtime Environment (build 1.8.0_191-b12) | |
Java HotSpot(TM) 64-Bit Server VM (build 25.191-b12, mixed mode) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
data Dir = C:\zookeeper_data |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
log.dirs=C:\kafka_logs | |
offsets.topic.num.partitions = 1 | |
offsets.topic.replication.factor = 1 | |
min.insync.replicas=1 | |
default.replication.factor = 1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
kafka_2.12-2.0.0\bin\windows\zookeeper-server-start.bat kafka_2.12-2.0.0\config\zookeeper.properties |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
kafka_2.12-2.0.0\bin\windows\kafka-server-start.bat kafka_2.12-2.0.0\config\server.properties |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
kafka_2.12-2.0.0\bin\windows\zookeeper-shell.bat localhost:2181 ls /brokers/ids |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
kafka_2.12-2.0.0\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
kafka_2.12-2.0.0\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
kafka_2.12-2.0.0\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
kafka_2.12-2.0.0\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static void main(String[] args) { | |
Properties props = new Properties(); | |
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "HelloStreams"); | |
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); | |
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, | |
Serdes.Integer().getClass()); | |
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, | |
Serdes.String().getClass()); | |
StreamsBuilder builder = new StreamsBuilder(); | |
KStream<Integer, String> kStream = builder.stream(topicName); | |
kStream.foreach((k, v) -> System.out.println("Key = " + k + " Value = " + v)); | |
//kStream.peek((k, v) -> System.out.println("Key = " + k + " Value = " + v)); | |
Topology topology = builder.build(); | |
KafkaStreams streams = new KafkaStreams(topology, props); | |
logger.info("Starting the stream"); | |
streams.start(); | |
Runtime.getRuntime().addShutdownHook(new Thread(() -> { | |
logger.info("Stopping Stream"); | |
streams.close(); | |
})); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
StreamsBuilder builder = new StreamsBuilder(); | |
builder.stream(topicName) | |
.foreach((k, v) -> System.out.println("Key = " + k + " Value = " + v)); | |
KafkaStreams streams = new KafkaStreams(builder.build(), props); | |
streams.start(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class OddEvenPartitioner implements Partitioner { | |
@Override | |
public int partition(String topic, Object key, byte[] keyBytes, | |
Object value, byte[] valueBytes, Cluster cluster) { | |
if ((keyBytes == null) || (!(key instanceof Integer))) | |
throw new InvalidRecordException("Topic Key must have a valid Integer value."); | |
if (cluster.partitionsForTopic(topic).size() != 2) | |
throw new InvalidTopicException("Topic must have exactly two partitions"); | |
return (Integer) key % 2; | |
} | |
@Override | |
public void close() { | |
//Nothing to close | |
} | |
@Override | |
public void configure(Map<String, ?> map) { | |
//Nothing to configure | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, OddEvenPartitioner.class.getName()); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
for (int j = 1; j <= numEvents; j++) { | |
int i = j; | |
producer.send(new ProducerRecord < > (topicName, i, "Simple Message-" + i), | |
(recordMetadata, e) -> { | |
if (e != null) | |
logger.error("Error sending message with key " + i + " Error - " + e.getMessage()); | |
else | |
logger.info("Message " + i + " persisted with offset " + recordMetadata.offset() + | |
" and timestamp on " + new Timestamp(recordMetadata.timestamp())); | |
}); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class JsonSerializer implements Serializer<JsonNode> { | |
private final ObjectMapper objectMapper = new ObjectMapper(); | |
public JsonSerializer() { | |
//Nothing to do | |
} | |
@Override | |
public void configure(Map<String, ?> config, boolean isKey) { | |
//Nothing to Configure | |
} | |
@Override | |
public byte[] serialize(String topic, JsonNode data) { | |
if (data == null) { | |
return null; | |
} | |
try { | |
return objectMapper.writeValueAsBytes(data); | |
} catch (JsonProcessingException e) { | |
throw new SerializationException("Error serializing JSON message", e); | |
} | |
} | |
@Override | |
public void close() { | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private static List < StockData > getStocks(String dataFile) throws IOException { | |
File file = new File(dataFile); | |
MappingIterator < StockData > stockDataIterator = | |
new CsvMapper().readerWithTypedSchemaFor(StockData.class).readValues(file); | |
return stockDataIterator.readAll(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, | |
JsonSerializer.class.getName()); | |
//Some code ommited for clarity | |
for (int i = 0; i < eventFiles.length; i++) { | |
for (StockData s: getStocks(eventFiles[i])) { | |
stockArrayOfList[i].add(objectMapper.valueToTree(s)); | |
} | |
dispatchers.add(new Thread(new Dispatcher(producer, topicName, | |
eventFiles[i], stockArrayOfList[i]), eventFiles[i])); | |
dispatchers.get(i).start(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class Dispatcher implements Runnable { | |
private final KafkaProducer<Integer, String> producer; | |
private final String topicName; | |
private final String fileLocation; | |
private static final Logger logger = LogManager.getLogger(Dispatcher.class); | |
Dispatcher(KafkaProducer<Integer, String> producer, String topicName, String fileLocation) { | |
this.producer = producer; | |
this.topicName = topicName; | |
this.fileLocation = fileLocation; | |
} | |
@Override | |
public void run() { | |
logger.info("Start processing " + fileLocation + "..."); | |
File file = new File(fileLocation); | |
int msgKey = 0; | |
try (Scanner scanner = new Scanner(file)) { | |
while (scanner.hasNextLine()) { | |
String line = scanner.nextLine(); | |
producer.send(new ProducerRecord<>(topicName, msgKey, line)); | |
msgKey++; | |
} | |
logger.trace("Finished sending " + msgKey + " messages from " + fileLocation); | |
} catch (Exception e) { | |
logger.error("Exception in thread " + fileLocation); | |
throw new RuntimeException(e); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static void main(String[] args) { | |
if (args.length < 2) { | |
System.out.println("Please provide command line arguments: topicName EventFiles"); | |
System.exit(-1); | |
} | |
String topicName = args[0]; | |
String[] eventFiles = Arrays.copyOfRange(args, 1, args.length); | |
Properties properties = new Properties(); | |
try { | |
InputStream configStream = ClassLoader.class.getResourceAsStream(kafkaConfig); | |
properties.load(configStream); | |
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); | |
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); | |
} catch (IOException e) { | |
logger.error("Cannot open Kafka config " + kafkaConfig); | |
throw new RuntimeException(e); | |
} | |
logger.trace("Starting dispatcher threads..."); | |
KafkaProducer<Integer, String> producer = new KafkaProducer<>(properties); | |
Thread[] dispatchers = new Thread[eventFiles.length]; | |
for (int i = 0; i < eventFiles.length; i++) { | |
dispatchers[i] = new Thread(new Dispatcher(producer, topicName, eventFiles[i])); | |
dispatchers[i].start(); | |
} | |
try { | |
for (Thread t : dispatchers) | |
t.join(); | |
} catch (InterruptedException e) { | |
logger.error("Thread Interrupted "); | |
} finally { | |
producer.close(); | |
logger.info("Finished dispatcher demo - Closing Kafka Producer."); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class AvroConsumer { | |
private static final String kafkaConfig = "/kafka.properties"; | |
@SuppressWarnings("InfiniteLoopStatement") | |
public static void main(String[] args) { | |
if (args.length < 2) { | |
System.out.println("Please provide command line arguments: topicName groupName"); | |
System.exit(-1); | |
} | |
String topicName = args[0]; | |
String groupName = args[1]; | |
Properties properties = new Properties(); | |
try { | |
InputStream kafkaConfigStream = | |
ClassLoader.class.getResourceAsStream(kafkaConfig); | |
properties.load(kafkaConfigStream); | |
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupName); | |
//Set autocommit to false so you can execute it again for the same set of messages | |
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); | |
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); | |
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, | |
StringDeserializer.class); | |
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, | |
KafkaAvroDeserializer.class); | |
properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); | |
properties.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, | |
"http://localhost:8081"); | |
} catch (IOException e) { | |
logger.error(e.getMessage()); | |
throw new RuntimeException(e); | |
} | |
final KafkaConsumer<String, StockData> consumer = new KafkaConsumer<>(properties); | |
consumer.subscribe(Collections.singletonList(topicName)); | |
while (true) { | |
ConsumerRecords<String, StockData> records = consumer.poll(Duration.ofMillis(100)); | |
for (ConsumerRecord<String, StockData> record : records) { | |
System.out.println(record.value()); | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{"namespace": "guru.learningjournal.kafka.examples", | |
"type": "record", | |
"name": "StockData", | |
"fields": [ | |
{"name": "symbol", "type": ["null", "string"]}, | |
{"name": "series", "type":["null", "string"]}, | |
{"name": "open", "type": ["null", "float"]}, | |
{"name": "high", "type": ["null", "float"]}, | |
{"name": "low", "type": ["null", "float"]}, | |
{"name": "close", "type": ["null", "float"]}, | |
{"name": "last", "type": ["null", "float"]}, | |
{"name": "previousClose", "type": ["null", "float"]}, | |
{"name": "totalTradedQty", "type": ["null", "long"]}, | |
{"name": "totalTradedVal", "type": ["null", "double"]}, | |
{"name": "tradeDate", "type": ["null", "string"]}, | |
{"name": "totalTrades", "type": ["null", "int"]}, | |
{"name": "isinCode", "type": ["null", "string"]} | |
] | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private static List<StockData> getStocks(String dataFile) throws IOException { | |
File file = new File(dataFile); | |
CsvSchema schema = CsvSchema.builder() | |
.addColumn("symbol", CsvSchema.ColumnType.STRING) | |
.addColumn("series", CsvSchema.ColumnType.STRING) | |
.addColumn("open", CsvSchema.ColumnType.NUMBER) | |
.addColumn("high", CsvSchema.ColumnType.NUMBER) | |
.addColumn("low", CsvSchema.ColumnType.NUMBER) | |
.addColumn("close", CsvSchema.ColumnType.NUMBER) | |
.addColumn("last", CsvSchema.ColumnType.NUMBER) | |
.addColumn("previousClose", CsvSchema.ColumnType.NUMBER) | |
.addColumn("totalTradedQty", CsvSchema.ColumnType.NUMBER) | |
.addColumn("totalTradedVal", CsvSchema.ColumnType.NUMBER) | |
.addColumn("tradeDate", CsvSchema.ColumnType.STRING) | |
.addColumn("totalTrades", CsvSchema.ColumnType.NUMBER) | |
.addColumn("isinCode", CsvSchema.ColumnType.STRING) | |
.build(); | |
MappingIterator<StockData> stockDataIterator = new | |
CsvMapper().readerFor(StockData.class).with(schema).readValues(file); | |
return stockDataIterator.readAll(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()); | |
properties.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"); | |
for (int i = 0; i < eventFiles.length; i++) { | |
logger.info("Preparing data for " + eventFiles[i]); | |
for (StockData s : getStocks(eventFiles[i])) { | |
stockArrayOfList[i].add(s); | |
} | |
dispatchers.add(new Thread(new Dispatcher(producer, topicName, eventFiles[i], | |
stockArrayOfList[i]), eventFiles[i])); | |
dispatchers.get(i).start(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class HelloProducer { | |
private static final Logger logger = LogManager.getLogger(HelloProducer.class); | |
public static void main(String[] args) { | |
logger.info("Creating Kafka Producer..."); | |
Properties props = new Properties(); | |
props.put(ProducerConfig.CLIENT_ID_CONFIG, AppConfigs.applicationID); | |
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, | |
AppConfigs.bootstrapServers); | |
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); | |
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, | |
StringSerializer.class.getName()); | |
KafkaProducer<Integer, String> producer = new KafkaProducer<>(props); | |
logger.info("Start sending messages..."); | |
for (int i = 1; i <= AppConfigs.numEvents; i++) { | |
producer.send(new ProducerRecord<>(AppConfigs.topicName, i, "Simple Message-" + i)); | |
} | |
logger.info("Finished - Closing Kafka Producer."); | |
producer.close(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
RecordMetadata metadata; | |
for (int i = 1; i <= numEvents; i++) { | |
metadata = producer.send(new ProducerRecord<>(topicName, i, "Simple Message-" + i)).get(); | |
logger.info("Message " + i + " persisted with offset " + metadata.offset() | |
+ " and timestamp on " + new Timestamp(metadata.timestamp())); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment