Skip to content

Instantly share code, notes, and snippets.

@raj454raj-harness
Last active August 12, 2022 15:46
Show Gist options
  • Save raj454raj-harness/4bb5ef25bf3b590f14c95add15e6ccff to your computer and use it in GitHub Desktop.
Save raj454raj-harness/4bb5ef25bf3b590f14c95add15e6ccff to your computer and use it in GitHub Desktop.
Redis Streams placeholder implementation
// Publish API for redis stream
void send(Map<String, String> messagePayload) {
StreamMessageId messageId = redisClient.getStream(streamName, new StringCodec("UTF-8")).addAll(messagePayload, maxTopicSize);
log.info("Message {} was inserted to stream", messageId);
}
// Read API for polling redis stream
List<Message> read(long maxWaitTime) {
// If you are using Redis 6.2 or above, consider using XAUTOCLAIM instead of a combination of XPENDING and XCLAIM
List<Message> pendingMessages = getPendingEntries();
if (messages.isEmpty()) {
// No message in the pending queue
return getNewMessages();
} else {
List<Message> claimedMessages = claimEntries(pendingMessages);
if (!claimedMessages.isEmpty()) {
return claimedMessages;
} else {
return getNewMessages(maxWaitTime);
}
}
}
// Consumer runnable thread
while (true) {
messages = client.read(blockTimeout);
for (Message message : messages) {
// Do some custom processing based on your application reads
processMessage(message);
// Confirm that the message is received so that no other consumer re-reads it
acknowledgeMessage(message);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment