Skip to content

Instantly share code, notes, and snippets.

@hbinduni
Created March 13, 2021 23:47
Show Gist options
  • Save hbinduni/3b8cc9892895841e0f4250c6c9af62de to your computer and use it in GitHub Desktop.
Save hbinduni/3b8cc9892895841e0f4250c6c9af62de to your computer and use it in GitHub Desktop.
Redis publish and subscribe to stream
const redis = require('redis');
const {promisify} = require('util');
const waitFor = (ms) => new Promise((r) => setTimeout(r, ms));
const host = 'localhost';
const port = 6379;
const password = 'xxxxx-xx-xxxx';
const options = {host, port, password};
const subscriber = redis.createClient(options);
const publisher = redis.createClient(options);
const xaddAsync = promisify(publisher.xadd).bind(publisher);
const xreadgroupAsync = promisify(subscriber.xreadgroup).bind(subscriber);
const xackAsync = promisify(subscriber.xack).bind(subscriber);
const xgroupAsync = promisify(subscriber.xgroup).bind(subscriber);
let timeToWait = 1000;
let ID = '0-0';
const getRndInteger = (min, max) => {
return Math.floor(Math.random() * (max - min + 1) ) + min;
};
const processMsgs = async ({messages, stream, groupName}) => {
for await (const msg of messages[0][1]) {
const id = msg[0];
const key = msg[1][0];
const content = msg[1][1];
console.log(`id: ${id}. key: ${key} => content: ${content}`);
await xackAsync(stream, groupName, id);
console.log('send ack', id);
ID = id;
}
};
const xgroup = async ({stream, groupName}) => {
await xgroupAsync('CREATE', stream, groupName, '$', 'MKSTREAM');
};
//! NOTE: use option MKSTREAM to auto create stream
// XGROUP CREATE mystream mygroup $ MKSTREAM
const xreadgroup = async ({ groupName, consumerName, stream }) => {
while (true) {
try {
console.log('reading id', ID);
const messages = await xreadgroupAsync('GROUP', groupName, consumerName, 'BLOCK', '2000', 'COUNT', '10', 'STREAMS', stream, ID);
console.log('messages', messages);
if (!messages) {
console.log(`empty messages. waiting for ${timeToWait / 1000}s`);
await waitFor(timeToWait);
timeToWait += getRndInteger(600, 1000);
// reset timeToWait when > 5s
if (timeToWait / 1000 > 5) {
console.log('reset timeToWait to 1s');
timeToWait = 1000;
}
continue;
}
if (!messages[0][1].length) {
console.log('end of history. get the latest data');
ID = '>';
}
await processMsgs({messages, stream, groupName});
} catch (err) {
console.log('err', err.message);
}
}
}
const publish = ({stream}) => {
let timeInterval = 1000;
setInterval(async () => {
console.log('publish', timeInterval);
const msg = {a: 100, b: 200, c: 'ok'};
await xaddAsync(stream, '*', `key1-${Date.now()}`, JSON.stringify(msg));
timeInterval += getRndInteger(600, 1000);
}, timeInterval);
};
(async () => {
const stream = 'stream1';
const groupName = 'group1';
const consumerName = 'consumer1';
// uncomment if you need sample publish to test
// publish({stream});
// wait 1s before do subscribe stream
//! NOTE: dont do await without setTimeout, it will blocking the whole process
setTimeout(async () => {
try {
console.log('create group stream if none');
await xgroup({stream, groupName});
} catch (err) {
console.log('normal err', err.message);
}
await xreadgroup({groupName, consumerName, stream, id: '0-0'});
}, 1000);
})();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment