Skip to content

Instantly share code, notes, and snippets.

@renanregis
Last active August 17, 2017 13:02
Show Gist options
  • Save renanregis/26fcfef1758ba6be63f50d8be69c2fe0 to your computer and use it in GitHub Desktop.
Save renanregis/26fcfef1758ba6be63f50d8be69c2fe0 to your computer and use it in GitHub Desktop.
Simple Kafka Producer Sample in Java
package com.veiculo.service;
import com.veiculo.model.Veiculo;
import flexjson.JSONSerializer;
import java.util.Properties;
import javax.annotation.PostConstruct;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
/**
*
* @author renan
*/
@Service
public class KafkaProducer {
@Value("${kafka.broker.address}")
private String brokerAddress;
private Producer<String, String> producer;
private String topic;
JSONSerializer json = new JSONSerializer();
public KafkaProducer() {
json.exclude("*.class");
}
@PostConstruct
public void init() {
Properties props = new Properties();
props.put("bootstrap.servers", brokerAddress);
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
props.put("acks", "1");
props.put("retries", "1");
props.put("linger.ms", 1);
producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
}
public void sendVeiculo(Veiculo veiculo) {
topic = "veiculo";
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, veiculo.getID().toString(), json.serialize(veiculo));
producer.send(producerRecord);
}
public void sendGateway(Veiculo veiculo) {
topic = "apigateway";
ProducerRecord<String, String> tx = new ProducerRecord<String, String>(topic, "cadastro_veiculo", veiculo.getIT_NU_RENAVAM());
producer.send(tx);
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, veiculo.getID().toString(), json.serialize(veiculo));
producer.send(producerRecord);
}
public void sendException(Exception ex) {
topic = "apigateway";
ProducerRecord<String, String> tx = new ProducerRecord<String, String>(topic, "exception", json.serialize(ex));
producer.send(tx);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment