Skip to content

Instantly share code, notes, and snippets.

@LearningJournal
Last active January 8, 2022 15:21
Show Gist options
  • Save LearningJournal/48e57ed088dbca117327a207c1704eb2 to your computer and use it in GitHub Desktop.
Save LearningJournal/48e57ed088dbca117327a207c1704eb2 to your computer and use it in GitHub Desktop.
Kafka Tutorial Code Samples for Learning Journal Website
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
kafka-topics.sh --list --zookeeper localhost:2181
kafka-console-producer.sh --broker-list localhost:9092 --topic test
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
tar -zxvf kafka_2.11-0.10.1.0.tgz
cd kafka_2.11-0.10.1.0
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic MyFirstTopic1 --partitions 1 --replication-factor 1
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic MyFirstTopic1
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic MyFirstTopic1
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
bin/kafka-server-start.sh config/server-1.properties
bin/kafka-server-start.sh config/server-2.properties
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic TestTopicXYZ --partitions 2 --replication-factor 3
import java.util.*;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class SupplierConsumer {
public static void main(String[] args) throws Exception {
String topicName = "SupplierTopic";
String groupName = "SupplierTopicGroup";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("group.id", groupName);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "SupplierDeserializer");
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topicName));
while (true) {
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records) {
System.out.println("Supplier id= " +
String.valueOf(record.value().getID()) +
" Supplier Name = " + record.value().getName() +
" Supplier Start Date = " +
record.value().getStartDate().toString());
}
}
}
}
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic TestTopicXYZ
sudo apt-get install openjdk-8-jdk
echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list
sudo apt-get update
sudo apt-get install sbt
name := "KafkaTest"
scalaVersion := "2.11.8"
libraryDependencies ++= Seq(
"org.apache.kafka" % "kafka-clients" % "1.0.0")
//File Name-SimpleProducer.java
import java.util.*;
import org.apache.kafka.clients.producer.*;
public class SimpleProducer {
public static void main(String[] args) throws Exception {
String topicName = "SimpleProducerTopic";
String key = "Key1";
String value = "Value-1";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, value);
producer.send(record);
producer.close();
System.out.println("SimpleProducer Completed.");
}
}
sbt compile
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic SimpleProducerTopic --from-beginning
import java.util.*;
import org.apache.kafka.clients.producer.*;
public class SimpleProducer {
public static void main(String[] args) throws Exception {
String key = "Key1";
String value = "Value-1";
String topicName = "SimpleProducerTopic";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, value);
producer.send(record);
producer.close();
System.out.println("SimpleProducer Completed.");
}
}
name := "KafkaTest"
libraryDependencies ++= Seq(
"org.apache.kafka" % "kafka-clients" % "0.10.1.0"
exclude("javax.jms", "jms")
exclude("com.sun.jdmk", "jmxtools")
exclude("com.sun.jmx", "jmxri")
exclude("org.slf4j", "slf4j-simple")
)
import java.util.*;
import org.apache.kafka.clients.producer.*;
public class SynchronousProducer {
public static void main(String[] args) throws Exception {
String topicName = "SynchronousProducerTopic";
String key = "Key-1";
String value = "Value-1";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, value);
try {
RecordMetadata metadata = producer.send(record).get();
System.out.println("Message is sent to Partition no " + metadata.partition() + " and offset " + metadata.offset());
System.out.println("SynchronousProducer Completed with success.");
} catch (Exception e) {
e.printStackTrace();
System.out.println("SynchronousProducer failed with an exception");
} finally {
producer.close();
}
}
}
import java.util.*;
import org.apache.kafka.clients.producer.*;
public class AsynchronousProducer {
public static void main(String[] args) throws Exception {
String topicName = "AsynchronousProducerTopic";
String key = "Key1";
String value = "Value-1";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, value);
producer.send(record, new MyProducerCallback());
System.out.println("AsynchronousProducer call completed");
producer.close();
}
}
class MyProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null)
System.out.println("AsynchronousProducer failed with an exception");
else
System.out.println("AsynchronousProducer call Success:");
}
}
import java.util.*;
import org.apache.kafka.clients.producer.*;
public class SensorProducer {
public static void main(String[] args) throws Exception {
String topicName = "SensorTopic";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class", "SensorPartitioner");
props.put("speed.sensor.name", "TSS");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++)
producer.send(new ProducerRecord<>(topicName, "SSP" + i, "500" + i));
for (int i = 0; i < 10; i++)
producer.send(new ProducerRecord<>(topicName, "TSS", "500" + i));
producer.close();
System.out.println("SimpleProducer Completed.");
}
}
import java.util.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.*;
import org.apache.kafka.common.utils.*;
import org.apache.kafka.common.record.*;
public class SensorPartitioner implements Partitioner {
private String speedSensorName;
public void configure(Map<String, ?> configs) {
speedSensorName = configs.get("speed.sensor.name").toString();
}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
int sp = (int) Math.abs(numPartitions * 0.3);
int p = 0;
if ((keyBytes == null) || (!(key instanceof String)))
throw new InvalidRecordException("All messages must have sensor name as key");
if (((String) key).equals(speedSensorName))
p = Utils.toPositive(Utils.murmur2(valueBytes)) % sp;
else
p = Utils.toPositive(Utils.murmur2(keyBytes)) % (numPartitions - sp) + sp;
System.out.println("Key = " + (String) key + " Partition = " + p);
return p;
}
public void close() {
}
}
import java.util.Date;
public class Supplier {
private int supplierId;
private String supplierName;
private Date supplierStartDate;
public Supplier(int id, String name, Date dt) {
this.supplierId = id;
this.supplierName = name;
this.supplierStartDate = dt;
}
public int getID() {
return supplierId;
}
public String getName() {
return supplierName;
}
public Date getStartDate() {
return supplierStartDate;
}
}
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.errors.SerializationException;
import java.io.UnsupportedEncodingException;
import java.util.Map;
import java.nio.ByteBuffer;
public class SupplierSerializer implements Serializer<Supplier> {
private String encoding = "UTF8";
@Override
public void configure(Map<String, ?> configs, booleanisKey) {
// nothing to configure
}
@Override
public byte[] serialize(String topic, Supplier data) {
int sizeOfName;
int sizeOfDate;
byte[] serializedName;
byte[] serializedDate;
try {
if (data == null)
return null;
serializedName = data.getName().getBytes(encoding);
sizeOfName = serializedName.length;
serializedDate = data.getStartDate().toString().getBytes(encoding);
sizeOfDate = serializedDate.length;
ByteBuffer buf = ByteBuffer.allocate(4 + 4 + sizeOfName + 4 + sizeOfDate);
buf.putInt(data.getID());
buf.putInt(sizeOfName);
buf.put(serializedName);
buf.putInt(sizeOfDate);
buf.put(serializedDate);
return buf.array();
} catch (Exception e) {
throw new SerializationException("Error when serializing Supplier to byte[]");
}
}
@Override
public void close() {
// nothing to do
}
}
import java.nio.ByteBuffer;
import java.util.Date;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import java.io.UnsupportedEncodingException;
import java.util.Map;
public class SupplierDeserializer implements Deserializer<Supplier> {
private String encoding = "UTF8";
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
//Nothing to configure
}
@Override
public Supplier deserialize(String topic, byte[] data) {
try {
if (data == null) {
System.out.println("Null recieved at deserialize");
return null;
}
ByteBuffer buf = ByteBuffer.wrap(data);
int id = buf.getInt();
int sizeOfName = buf.getInt();
byte[] nameBytes = new byte[sizeOfName];
buf.get(nameBytes);
String deserializedName = new String(nameBytes, encoding);
int sizeOfDate = buf.getInt();
byte[] dateBytes = new byte[sizeOfDate];
buf.get(dateBytes);
String dateString = new String(dateBytes, encoding);
DateFormat df = new SimpleDateFormat("EEE MMM ddHH:mm:ss Z yyyy");
return new Supplier(id, deserializedName, df.parse(dateString));
} catch (Exception e) {
throw new SerializationException("Error when deserializing byte[] to Supplier");
}
}
@Override
public void close() {
// nothing to do
}
}
import java.util.*;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import org.apache.kafka.clients.producer.*;
public class SupplierProducer {
public static void main(String[] args) throws Exception {
String topicName = "SupplierTopic";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "SupplierSerializer");
Producer<String, Supplier> producer = new KafkaProducer<>(props);
DateFormat df = new SimpleDateFormat("yyyy-MM-dd");
Supplier sp1 = new Supplier(101, "Xyz Pvt Ltd.", df.parse("2016-04-01"));
Supplier sp2 = new Supplier(102, "Abc Pvt Ltd.", df.parse("2012-01-01"));
producer.send(new ProducerRecord<String, Supplier>(topicName, "SUP", sp1)).get();
producer.send(new ProducerRecord<String, Supplier>(topicName, "SUP", sp2)).get();
System.out.println("SupplierProducer Completed.");
producer.close();
}
}
import java.util.*;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class SupplierConsumer {
public static void main(String[] args) throws Exception {
String topicName = "SupplierTopic";
String groupName = "SupplierTopicGroup";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("group.id", groupName);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "SupplierDeserializer");
KafkaConsumer<String, Supplier> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topicName));
while (true) {
ConsumerRecords<String, Supplier> records = consumer.poll(100);
for (ConsumerRecord<String, Supplier> record : records) {
System.out.println("Supplier id= " + String.valueOf(record.value().getID()) + " Supplier Name = " + record.value().getName() + " Supplier Start Date = " + record.value().getStartDate().toString());
}
}
}
}
name := "KafkaTest"
libraryDependencies ++= Seq(
"org.apache.kafka" % "kafka-clients" % "0.10.1.0"
exclude("javax.jms", "jms")
exclude("com.sun.jdmk", "jmxtools")
exclude("com.sun.jmx", "jmxri")
exclude("org.slf4j", "slf4j-simple")
)
import java.util.*;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class SupplierConsumer {
public static void main(String[] args) throws Exception {
String topicName = "SupplierTopic";
String groupName = "SupplierTopicGroup";
Properties props = newProperties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("group.id", groupName);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "SupplierDeserializer");
KafkaConsumer consumer = newKafkaConsumer <>(props);
consumer.subscribe(Arrays.asList(topicName));
while (true) {
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecordrecord:
records) {
System.out.println("Supplier id= " +
String.valueOf(record.value().getID()) +
" Supplier Name = " + record.value().getName() +
" Supplier Start Date = " +
record.value().getStartDate().toString());
}
}
}
}
bootstrap.servers=localhost:9092,localhost:9093
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=SupplierDeserializer
group.id=SupplierTopicGroup
import java.util.*;
import java.io.*;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class NewSupplierConsumer {
public static void main(String[] args) throws Exception {
String topicName = "SupplierTopic";
String groupName = "SupplierTopicGroup";
Properties props = new Properties();
//props.put("bootstrap.servers", "localhost:9092,localhost:9093");
//props.put("group.id", groupName);
//props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//props.put("value.deserializer", "SupplierDeserializer");
InputStream input = null;
KafkaConsumer<String, Supplier> consumer = null;
try {
input = new FileInputStream("SupplierConsumer.properties");
props.load(input);
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topicName));
while (true) {
ConsumerRecords<String, Supplier> records = consumer.poll(100);
for (ConsumerRecord<String, Supplier> record : records) {
System.out.println("Supplier id= " + String.valueOf(record.value().getID()) +
" Supplier Name = " + record.value().getName() +
" Supplier Start Date = " + record.value().getStartDate().toString());
}
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {
input.close();
consumer.close();
}
}
}
import java.util.*;
import java.io.*;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class ManualConsumer {
public static void main(String[] args) throws Exception {
String topicName = "SupplierTopic";
String groupName = "SupplierTopicGroup";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("group.id", groupName);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "SupplierDeserializer");
props.put("enable.auto.commit", "false");
KafkaConsumer<String, Supplier> consumer = null;
try {
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topicName));
while (true) {
ConsumerRecords<String, Supplier> records = consumer.poll(100);
for (ConsumerRecord<String, Supplier> record : records) {
System.out.println("Supplier id= " + String.valueOf(record.value().getID()) +
" Supplier Name = " + record.value().getName() +
" Supplier Start Date = " + record.value().getStartDate().toString());
}
consumer.commitAsync();
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {
consumer.commitSync();
consumer.close();
}
}
}
import java.util.*;
import org.apache.kafka.clients.producer.*;
public class RandomProducer {
public static void main(String[] args) throws InterruptedException {
String topicName = "RandomProducerTopic";
String msg;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
Random rg = new Random();
Calendar dt = Calendar.getInstance();
dt.set(2016, 1, 1);
try {
while (true) {
for (int i = 0; i < 100; i++) {
msg = dt.get(Calendar.YEAR) + "-" +
dt.get(Calendar.MONTH) + "-" +
dt.get(Calendar.DATE) + "," +
rg.nextInt(1000);
producer.send(new ProducerRecord<String, String>(topicName, 0, "Key", msg)).get();
msg = dt.get(Calendar.YEAR) + "-" +
dt.get(Calendar.MONTH) + "-" +
dt.get(Calendar.DATE) + "," +
rg.nextInt(1000);
producer.send(new ProducerRecord<String, String>(topicName, 1, "Key", msg)).get();
}
dt.add(Calendar.DATE, 1);
System.out.println("Data Sent for " +
dt.get(Calendar.YEAR) + "-" +
dt.get(Calendar.MONTH) + "-" +
dt.get(Calendar.DATE));
}
} catch (Exception ex) {
System.out.println("Intrupted");
} finally {
producer.close();
}
}
}
import java.util.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.*;
public class RandomConsumer {
public static void main(String[] args) throws Exception {
String topicName = "RandomProducerTopic";
KafkaConsumer<String, String> consumer = null;
String groupName = "RG";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("group.id", groupName);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", "false");
consumer = new KafkaConsumer<>(props);
RebalanceListnerrebalanceListner = new RebalanceListner(consumer);
consumer.subscribe(Arrays.asList(topicName), rebalanceListner);
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
/*System.out.println("Topic:"+ record.topic() +
" Partition:" + record.partition() +
" Offset:" + record.offset() + " Value:"+ record.value());*/
// Do some processing and save it to Database
rebalanceListner.addOffset(record.topic(), record.partition(), record.offset());
}
//consumer.commitSync(rebalanceListner.getCurrentOffsets());
}
} catch (Exception ex) {
System.out.println("Exception.");
ex.printStackTrace();
} finally {
consumer.close();
}
}
}
import java.util.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.*;
public class RebalanceListner implements ConsumerRebalanceListener {
private KafkaConsumer consumer;
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap();
public RebalanceListner(KafkaConsumer con) {
this.consumer = con;
}
public void addOffset(String topic, int partition, long offset) {
currentOffsets.put(new TopicPartition(topic, partition), new OffsetAndMetadata(offset, "Commit"));
}
public Map<TopicPartition, OffsetAndMetadata> getCurrentOffsets() {
return currentOffsets;
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("Following Partitions Assigned ....");
for (TopicPartition partition : partitions)
System.out.println(partition.partition() + ",");
}
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Following Partitions Revoked ....");
for (TopicPartition partition : partitions)
System.out.println(partition.partition() + ",");
System.out.println("Following Partitions commited ....");
for (TopicPartition tp : currentOffsets.keySet())
System.out.println(tp.partition());
consumer.commitSync(currentOffsets);
currentOffsets.clear();
}
}
//Code from random consumer example
while(true){
ConsumerRecords<String, String> records=consumer.poll(100);
for(ConsumerRecord<String, String>record:records){
/*System.out.println("Topic:"+ record.topic() +
" Partition:" + record.partition() +
" Offset:" + record.offset() +
" Value:"+ record.value());*/
//Step - 1
// Do some processing and save it to Database
rebalanceListner.addOffset(record.topic(),record.partition(),record.offset());
}
//Step - 2
//consumer.commitSync(rebalanceListner.getCurrentOffsets());
}
yum install mysql-server
service mysqld start
mysql_secure_installation
source tss.sql
create database test;
use test;
create table tss_data(skey varchar(50), svalue varchar(50));
create table tss_offsets(topic_name varchar(50),partition int, offset int);
insert into tss_offsets values('SensorTopic1',0,0);
insert into tss_offsets values('SensorTopic1',1,0);
insert into tss_offsets values('SensorTopic1',2,0);
import java.util.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.*;
import java.sql.*;
public class SensorConsumer {
public static void main(String[] args) throws Exception {
String topicName = "SensorTopic";
KafkaConsumer<String, String> consumer = null;
int rCount;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", "false");
consumer = new KafkaConsumer<>(props);
TopicPartition p0 = new TopicPartition(topicName, 0);
TopicPartition p1 = new TopicPartition(topicName, 1);
TopicPartition p2 = new TopicPartition(topicName, 2);
consumer.assign(Arrays.asList(p0, p1, p2));
System.out.println("Current position p0=" + consumer.position(p0)
+ " p1=" + consumer.position(p1)
+ " p2=" + consumer.position(p2));
consumer.seek(p0, getOffsetFromDB(p0));
consumer.seek(p1, getOffsetFromDB(p1));
consumer.seek(p2, getOffsetFromDB(p2));
System.out.println("New positions po=" + consumer.position(p0)
+ " p1=" + consumer.position(p1)
+ " p2=" + consumer.position(p2));
System.out.println("Start Fetching Now");
try {
do {
ConsumerRecords<String, String> records = consumer.poll(1000);
System.out.println("Record polled " + records.count());
rCount = records.count();
for (ConsumerRecord<String, String> record : records) {
saveAndCommit(consumer, record);
}
} while (rCount > 0);
} catch (Exception ex) {
System.out.println("Exception in main.");
} finally {
consumer.close();
}
}
private static long getOffsetFromDB(TopicPartition p) {
long offset = 0;
try {
Class.forName("com.mysql.jdbc.Driver");
Connection con = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "pandey");
String sql = "select offset from tss_offsets where topic_name='"
+ p.topic() + "' and partition=" + p.partition();
Statement stmt = con.createStatement();
ResultSet rs = stmt.executeQuery(sql);
if (rs.next())
offset = rs.getInt("offset");
stmt.close();
con.close();
} catch (Exception e) {
System.out.println("Exception in getOffsetFromDB");
}
return offset;
}
private static void saveAndCommit(KafkaConsumer<String, String> c, ConsumerRecord<String, String> r) {
System.out.println("Topic=" + r.topic() + " Partition=" + r.partition() + " Offset=" + r.offset()
+ " Key=" + r.key() + " Value=" + r.value());
try {
Class.forName("com.mysql.jdbc.Driver");
Connection con = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "pandey");
con.setAutoCommit(false);
String insertSQL = "insert into tss_data values(?,?)";
PreparedStatement psInsert = con.prepareStatement(insertSQL);
psInsert.setString(1, r.key());
psInsert.setString(2, r.value());
String updateSQL = "update tss_offsets set offset=? where topic_name=? and partition=?";
PreparedStatement psUpdate = con.prepareStatement(updateSQL);
psUpdate.setLong(1, r.offset() + 1);
psUpdate.setString(2, r.topic());
psUpdate.setInt(3, r.partition());
psInsert.executeUpdate();
psUpdate.executeUpdate();
con.commit();
con.close();
} catch (Exception e) {
System.out.println("Exception in saveAndCommit");
}
}
}
{ "type": "record",
"name": "ClickRecord",
"fields": [
{"name": "session_id", "type": "string"},
{"name": "browser", "type": ["string", "null"]},
{"name": "campaign", "type": ["string", "null"]},
{"name": "channel", "type": "string"},
{"name": "referrer", "type": ["string", "null"], "default": "None"},
{"name": "ip", "type": ["string", "null"]}
]
}
java -jar avro-tools-1.8.1.jar compile schema ClickRecordV1.avsc
import java.util.*;
import org.apache.kafka.clients.producer.*;
public class AvroProducer {
public static void main(String[] args) throws Exception {
String topicName = "AvroClicks";
String msg;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://localhost:8081");
Producer<String, ClickRecord> producer = new KafkaProducer<>(props);
ClickRecord cr = new ClickRecord();
try {
cr.setSessionId("10001");
cr.setChannel("HomePage");
cr.setIp("192.168.0.1");
producer.send(new ProducerRecord<String, ClickRecord>(topicName, cr.getSessionId().toString(), cr)).get();
System.out.println("Complete");
} catch (Exception ex) {
ex.printStackTrace(System.out);
} finally {
producer.close();
}
}
}
import java.util.*;
import org.apache.kafka.clients.consumer.*;
public class AvroConsumer {
public static void main(String[] args) throws Exception {
String topicName = "AvroClicks";
String groupName = "RG";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("group.id", groupName);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://localhost:8081");
props.put("specific.avro.reader", "true");
KafkaConsumer<String, ClickRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topicName));
try {
while (true) {
ConsumerRecords<String, ClickRecord> records = consumer.poll(100);
for (ConsumerRecord<String, ClickRecord> record : records) {
System.out.println("Session id=" + record.value().getSessionId()
+ " Channel=" + record.value().getChannel()
+ " Referrer=" + record.value().getReferrer());
}
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {
consumer.close();
}
}
}
name := "AvroTest"
val repositories = Seq(
"confluent" at "http://packages.confluent.io/maven/",
Resolver.sonatypeRepo("public")
)
libraryDependencies ++= Seq(
"org.apache.avro" % "avro" % "1.8.1",
"io.confluent" % "kafka-avro-serializer" % "3.1.1",
"org.apache.kafka" % "kafka-clients" % "0.10.1.0"
exclude("javax.jms", "jms")
exclude("com.sun.jdmk", "jmxtools")
exclude("com.sun.jmx", "jmxri")
exclude("org.slf4j", "slf4j-simple")
)
resolvers += "confluent" at "http://packages.confluent.io/maven/"
{"type": "record",
"name": "ClickRecord",
"fields": [
{"name": "session_id", "type": "string"},
{"name": "browser", "type": ["string", "null"]},
{"name": "campaign", "type": ["string", "null"]},
{"name": "channel", "type": "string"},
{"name": "entry_url", "type": ["string", "null"], "default": "None"},
{"name": "ip", "type": ["string", "null"]},
{"name": "language", "type": ["string", "null"], "default": "None"},
{"name": "os", "type": ["string", "null"],"default": "None"}
]
}
java -jar avro-tools-1.8.1.jar compile schema ClickRecordV2.avsc
import java.util.*;
import org.apache.kafka.clients.producer.*;
public class ClickRecordProducerV2 {
public static void main(String[] args) throws Exception {
String topicName = "AvroClicks";
String msg;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://localhost:8081");
Producer<String, ClickRecord> producer = new KafkaProducer<>(props);
ClickRecord cr = new ClickRecord();
try {
cr.setSessionId("10001");
cr.setChannel("HomePage");
cr.setIp("192.168.0.1");
cr.setLanguage("Spanish");
cr.setOs("Mac");
cr.setEntryUrl("http://facebook.com/myadd");
producer.send(new ProducerRecord<String, ClickRecord>(topicName, cr.getSessionId().toString(), cr)).get();
System.out.println("Complete");
} catch (Exception ex) {
ex.printStackTrace(System.out);
} finally {
producer.close();
}
}
}
import java.util.*;
import org.apache.kafka.clients.consumer.*;
public class ClickRecordConsumerV2 {
public static void main(String[] args) throws Exception {
String topicName = "AvroClicks";
String groupName = "RG";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("group.id", groupName);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://localhost:8081");
props.put("specific.avro.reader", "true");
KafkaConsumer<String, ClickRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topicName));
try {
while (true) {
ConsumerRecords<String, ClickRecord> records = consumer.poll(100);
for (ConsumerRecord<String, ClickRecord> record : records) {
System.out.println("Session id=" + record.value().getSessionId()
+ " Channel=" + record.value().getChannel()
+ " Entry URL=" + record.value().getEntryUrl()
+ " Language=" + record.value().getLanguage());
}
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {
consumer.close();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment