Skip to content

Instantly share code, notes, and snippets.

@serdarmumcu
Created March 28, 2024 10:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save serdarmumcu/f3243f3bc14231a6195e07b85cca389e to your computer and use it in GitHub Desktop.
Save serdarmumcu/f3243f3bc14231a6195e07b85cca389e to your computer and use it in GitHub Desktop.
@Component
public class MessageListener {
private final Logger logger = LoggerFactory.getLogger(MessageListener.class);
@Value("${demo.consumerIdempotency:false}")
private boolean consumerIdempotencyEnabled;
@Autowired
private RedisService redisService;
@KafkaListener(topics = "messages-topic", groupId = "group_id")
public void listen(String message) {
String messageId = generateHash(message);
if (!consumerIdempotencyEnabled || redisService.checkAndSet(messageId)) {
logger.info("Received message in group 'group_id': {}", message);
} else {
logger.info("Duplicate message ignored: {}", message);
}
}
private String generateHash(String message) {
try {
MessageDigest digest = MessageDigest.getInstance("SHA-256");
byte[] hash = digest.digest(message.getBytes(StandardCharsets.UTF_8));
StringBuilder hexString = new StringBuilder();
for (byte b : hash) {
String hex = Integer.toHexString(0xff & b);
if(hex.length() == 1) hexString.append('0');
hexString.append(hex);
}
return hexString.toString();
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("Failed to generate hash for message", e);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment