Skip to content

Instantly share code, notes, and snippets.

@wutingjia
Last active May 31, 2019 01:06
Show Gist options
  • Save wutingjia/4011f76a391593220d9c46bbb06db1bb to your computer and use it in GitHub Desktop.
Save wutingjia/4011f76a391593220d9c46bbb06db1bb to your computer and use it in GitHub Desktop.
kafkaClient

Kafka-Clients

1、添加依赖

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.1.4.RELEASE</version>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>1.0.0</version>
</dependency>

注意springboot spring-kafka kafka-client 之间的版本关系很重要! https://spring.io/projects/spring-kafka

2、可以只使用kafka-clients 以下为demo:

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "false");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        buffer.add(record);
    }
    if (buffer.size() >= minBatchSize) {
        insertIntoDb(buffer);
        consumer.commitSync();
        buffer.clear();
    }
}

3、如果使用spring-kafka:

@Configuration
@EnableKafka
public class KafkaProducer {
 
	public Map<String, Object> producerConfigs() {
		Map<String, Object> props = new HashMap<>();
		// kafka.metadata.broker.list=10.16.0.214:9092,10.16.0.215:9092,10.16.0.216:9092
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ReadConfigation.getConfigItem("kafka.metadata.broker.list"));
		props.put(ProducerConfig.RETRIES_CONFIG, 0);
		props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
		props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
		props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		return props;
	}
 
	/** 获取工厂 */
	public ProducerFactory<String, String> producerFactory() {
		return new DefaultKafkaProducerFactory<>(producerConfigs());
	}
 
	/** 注册实例 */
	@Bean
	public KafkaTemplate<String, String> kafkaTemplate() {
		return new KafkaTemplate<>(producerFactory());
	}
}


   	@Resource
	private KafkaTemplate kafkaTemplate;
	
	调用方法发送数据:
	kafkaTemplate.send(topic, msg);


@Configuration
@EnableKafka
public class KafkaConsumer {
 
	public Map<String, Object> consumerConfigs() {
		Map<String, Object> props = new HashMap<>();
		props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
		props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        //...
        //填写各种配置,或者读取properties文件 然后转成map(推荐)

		return props;
	}
 
	/** 获取工厂 */
	public ConsumerFactory<String, String> consumerFactory() {
		return new DefaultKafkaConsumerFactory<>(consumerConfigs());
	}
 
	/** 获取实例 */
	@Bean
	public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
		ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
		factory.setConsumerFactory(consumerFactory());
		factory.setConcurrency(3);
		factory.getContainerProperties().setPollTimeout(3000);
		return factory;
	}
}

/*在任意被spring托管的类中*/
 @KafkaListener(topics = { "taskCmd" })
	public void taskCmd(ConsumerRecord<?, ?> record) {
		Object message = record.value();
		logger.info("收到管理平台命令:" + message);
	}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment