Last active
August 12, 2022 15:46
-
-
Save raj454raj-harness/4bb5ef25bf3b590f14c95add15e6ccff to your computer and use it in GitHub Desktop.
Redis Streams placeholder implementation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Publish API for redis stream | |
void send(Map<String, String> messagePayload) { | |
StreamMessageId messageId = redisClient.getStream(streamName, new StringCodec("UTF-8")).addAll(messagePayload, maxTopicSize); | |
log.info("Message {} was inserted to stream", messageId); | |
} | |
// Read API for polling redis stream | |
List<Message> read(long maxWaitTime) { | |
// If you are using Redis 6.2 or above, consider using XAUTOCLAIM instead of a combination of XPENDING and XCLAIM | |
List<Message> pendingMessages = getPendingEntries(); | |
if (messages.isEmpty()) { | |
// No message in the pending queue | |
return getNewMessages(); | |
} else { | |
List<Message> claimedMessages = claimEntries(pendingMessages); | |
if (!claimedMessages.isEmpty()) { | |
return claimedMessages; | |
} else { | |
return getNewMessages(maxWaitTime); | |
} | |
} | |
} | |
// Consumer runnable thread | |
while (true) { | |
messages = client.read(blockTimeout); | |
for (Message message : messages) { | |
// Do some custom processing based on your application reads | |
processMessage(message); | |
// Confirm that the message is received so that no other consumer re-reads it | |
acknowledgeMessage(message); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment