Last active
August 17, 2017 13:02
-
-
Save renanregis/26fcfef1758ba6be63f50d8be69c2fe0 to your computer and use it in GitHub Desktop.
Simple Kafka Producer Sample in Java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.veiculo.service; | |
import com.veiculo.model.Veiculo; | |
import flexjson.JSONSerializer; | |
import java.util.Properties; | |
import javax.annotation.PostConstruct; | |
import org.apache.kafka.clients.producer.Producer; | |
import org.apache.kafka.clients.producer.ProducerRecord; | |
import org.apache.kafka.common.serialization.StringSerializer; | |
import org.springframework.beans.factory.annotation.Value; | |
import org.springframework.stereotype.Service; | |
/** | |
* | |
* @author renan | |
*/ | |
@Service | |
public class KafkaProducer { | |
@Value("${kafka.broker.address}") | |
private String brokerAddress; | |
private Producer<String, String> producer; | |
private String topic; | |
JSONSerializer json = new JSONSerializer(); | |
public KafkaProducer() { | |
json.exclude("*.class"); | |
} | |
@PostConstruct | |
public void init() { | |
Properties props = new Properties(); | |
props.put("bootstrap.servers", brokerAddress); | |
props.put("key.serializer", StringSerializer.class.getName()); | |
props.put("value.serializer", StringSerializer.class.getName()); | |
props.put("acks", "1"); | |
props.put("retries", "1"); | |
props.put("linger.ms", 1); | |
producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props); | |
} | |
public void sendVeiculo(Veiculo veiculo) { | |
topic = "veiculo"; | |
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, veiculo.getID().toString(), json.serialize(veiculo)); | |
producer.send(producerRecord); | |
} | |
public void sendGateway(Veiculo veiculo) { | |
topic = "apigateway"; | |
ProducerRecord<String, String> tx = new ProducerRecord<String, String>(topic, "cadastro_veiculo", veiculo.getIT_NU_RENAVAM()); | |
producer.send(tx); | |
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, veiculo.getID().toString(), json.serialize(veiculo)); | |
producer.send(producerRecord); | |
} | |
public void sendException(Exception ex) { | |
topic = "apigateway"; | |
ProducerRecord<String, String> tx = new ProducerRecord<String, String>(topic, "exception", json.serialize(ex)); | |
producer.send(tx); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment