Skip to content

Instantly share code, notes, and snippets.

@kasramp
Created October 3, 2020 08:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kasramp/3d5198cdaf6e67d2a2dd6e9d3f1d026c to your computer and use it in GitHub Desktop.
Save kasramp/3d5198cdaf6e67d2a2dd6e9d3f1d026c to your computer and use it in GitHub Desktop.
package com.madadipouya.springkafkatest.kafka.consumer;
import com.madadipouya.springkafkatest.dto.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component
public class UserKafkaConsumer {
private final Logger logger = LoggerFactory.getLogger(UserKafkaConsumer.class);
@KafkaListener(topics = "${spring.kafka.topic.name}",
concurrency = "${spring.kafka.consumer.level.concurrency:3}")
public void logKafkaMessages(@Payload User user,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
@Header(KafkaHeaders.OFFSET) Long offset) {
logger.info("Received a message contains a user information with id {}, from {} topic, " +
"{} partition, and {} offset", user.getUuid(), topic, partition, offset);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment