Skip to content

Instantly share code, notes, and snippets.

@kleysonr
Created January 15, 2018 21:29
Show Gist options
  • Save kleysonr/d76df87479cc884818ebe870d297d7e5 to your computer and use it in GitHub Desktop.
Save kleysonr/d76df87479cc884818ebe870d297d7e5 to your computer and use it in GitHub Desktop.
Java Kafka Producer/Consumer Sample
package sample1;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.Base64;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import com.google.gson.Gson;
class MsgKafka {
private String id;
private String timestamp;
private String data;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getTimestamp() {
return timestamp;
}
public void setTimestamp(String timestamp) {
this.timestamp = timestamp;
}
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
}
public class KafkaConsumerSample
{
public static void main(String[] args) throws InterruptedException, UnsupportedEncodingException {
Properties properties = new Properties();
properties.put("bootstrap.servers", "ipaddress:6667");
properties.put("kafka.topic" , "my-topic");
properties.put("compression.type" , "gzip");
properties.put("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("max.partition.fetch.bytes", "2097152");
properties.put("max.poll.records" , "500");
properties.put("group.id" , "my-group");
runMainLoop(args, properties);
}
static void runMainLoop(String[] args, Properties properties) throws InterruptedException, UnsupportedEncodingException {
// Create Kafka producer
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
try {
consumer.subscribe(Arrays.asList(properties.getProperty("kafka.topic")));
System.out.println("Subscribed to topic " + properties.getProperty("kafka.topic"));
while (true)
{
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.printf("partition = %s, offset = %d, key = %s, value = %s\n", record.partition(), record.offset(), record.key(), decodeMsg(record.value()).getData() );
}
}
}
finally {
consumer.close();
}
}
public static MsgKafka decodeMsg(String json) throws UnsupportedEncodingException {
Gson gson = new Gson();
MsgKafka msg = gson.fromJson(json, MsgKafka.class);
byte[] encodedData = Base64.getDecoder().decode(msg.getData());
msg.setData(new String(encodedData, "utf-8"));
return msg;
}
}
package sample1;
import java.io.UnsupportedEncodingException;
import java.sql.Timestamp;
import java.util.Base64;
import java.util.Properties;
import java.util.Random;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
public class KafkaProducerSample
{
public static void main(String[] args) throws InterruptedException, UnsupportedEncodingException {
Properties properties = new Properties();
properties.put("bootstrap.servers", "ipaddress:6667");
properties.put("acks" , "0");
properties.put("retries" , "1");
properties.put("batch.size" , "20971520");
properties.put("linger.ms" , "33");
properties.put("max.request.size" , "2097152");
properties.put("compression.type" , "gzip");
properties.put("key.serializer" , "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer" , "org.apache.kafka.common.serialization.StringSerializer");
properties.put("kafka.topic" , "my-topic");
runMainLoop(args, properties);
}
static void runMainLoop(String[] args, Properties properties) throws InterruptedException, UnsupportedEncodingException {
// Create Kafka producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
try {
while(true) {
Thread.sleep(1000);
String id = "device-" + getRandomNumberInRange(1,5);
producer.send(new ProducerRecord<String, String>(properties.getProperty("kafka.topic"), id, getMsg(id)));
}
} finally {
producer.close();
}
}
public static String getMsg(String id) throws UnsupportedEncodingException {
Gson gson = new Gson();
String timestamp = new Timestamp(System.currentTimeMillis()).toString();
JsonObject obj = new JsonObject();
obj.addProperty("id", id);
obj.addProperty("timestamp", timestamp);
obj.addProperty("data", Base64.getEncoder().encodeToString("this is my message data ...".getBytes("utf-8")));
String json = gson.toJson(obj);
return json;
}
private static int getRandomNumberInRange(int min, int max) {
Random r = new Random();
return r.ints(min, (max + 1)).findFirst().getAsInt();
}
}
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>kafkasample</groupId>
<artifactId>producer-consumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<!-- gson -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.2</version>
</dependency>
<!-- kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.1.0</version>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>${basedir}/src/main/resources</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment