Created
March 28, 2024 10:06
-
-
Save serdarmumcu/f3243f3bc14231a6195e07b85cca389e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Component | |
public class MessageListener { | |
private final Logger logger = LoggerFactory.getLogger(MessageListener.class); | |
@Value("${demo.consumerIdempotency:false}") | |
private boolean consumerIdempotencyEnabled; | |
@Autowired | |
private RedisService redisService; | |
@KafkaListener(topics = "messages-topic", groupId = "group_id") | |
public void listen(String message) { | |
String messageId = generateHash(message); | |
if (!consumerIdempotencyEnabled || redisService.checkAndSet(messageId)) { | |
logger.info("Received message in group 'group_id': {}", message); | |
} else { | |
logger.info("Duplicate message ignored: {}", message); | |
} | |
} | |
private String generateHash(String message) { | |
try { | |
MessageDigest digest = MessageDigest.getInstance("SHA-256"); | |
byte[] hash = digest.digest(message.getBytes(StandardCharsets.UTF_8)); | |
StringBuilder hexString = new StringBuilder(); | |
for (byte b : hash) { | |
String hex = Integer.toHexString(0xff & b); | |
if(hex.length() == 1) hexString.append('0'); | |
hexString.append(hex); | |
} | |
return hexString.toString(); | |
} catch (NoSuchAlgorithmException e) { | |
throw new RuntimeException("Failed to generate hash for message", e); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment