Skip to content

Instantly share code, notes, and snippets.

@bufferings
Last active June 5, 2017 15:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bufferings/c30c7cd7a2e5a52c97039a262b183411 to your computer and use it in GitHub Desktop.
Save bufferings/c30c7cd7a2e5a52c97039a262b183411 to your computer and use it in GitHub Desktop.
Kafka Java
package com.example.demo;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.stream.IntStream;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Test;
public class KafkaTest1 {
@Test
public void gettingStartedProducer() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
try (KafkaProducer<Integer, String> producer = new KafkaProducer<>(properties)) {
IntStream.rangeClosed(1, 10).forEach(i -> {
try {
producer.send(new ProducerRecord<>("my-topic", i, "value" + i)).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
}
}
@Test
public void gettingStartedConsumer() {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
try (KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(properties)) {
consumer.subscribe(Arrays.asList("my-topic"));
ConsumerRecords<Integer, String> records = consumer.poll(1000L);
records.forEach(r -> System.out.println(r.key() + ":" + r.value()));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment