Skip to content

Instantly share code, notes, and snippets.

@wirelessr
Created November 18, 2021 13:21
Show Gist options
  • Save wirelessr/b88561af2fe629bb94fe0a6cc4410af8 to your computer and use it in GitHub Desktop.
Save wirelessr/b88561af2fe629bb94fe0a6cc4410af8 to your computer and use it in GitHub Desktop.
A doable worker in Redis Stream
let lastid = "0-0";
let checkBacklog = true
while (true) {
const myid = checkBacklog ? lastid : ">";
const items = await redis.xreadgroup('GROUP',GroupName,ConsumerName,'BLOCK','2000','COUNT','10','STREAMS',StreamName,myid);
if (!items) continue;
checkBacklog = !(items[0][1].length == 0);
items[0][1].forEach(elem => {
const [id, fields] = elem;
await processMessage(id,fields);
await redis.xack(StreamName,GroupName,id);
lastid = id;
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment