Skip to content

Instantly share code, notes, and snippets.

@forkfork
Created March 14, 2022 13:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save forkfork/56a0e0db2ce20b10ba4016e1034cb228 to your computer and use it in GitHub Desktop.
Save forkfork/56a0e0db2ce20b10ba4016e1034cb228 to your computer and use it in GitHub Desktop.
simplequeue
const Redis = require("ioredis");
const { v4: uuidv4 } = require('uuid');
const redis = new Redis();
const LIFETIME_MS = 1000;
let status = 'running';
const processId = uuidv4();
const sleep = (milliseconds) => {
return new Promise(resolve => setTimeout(resolve, milliseconds))
}
function terminate() {
status = 'terminating';
}
setTimeout(terminate, 10000, 'terminate');
redis.defineCommand('atomicread', {
numberOfKeys: 2,
lua: `
local lockowner = redis.call('GET', KEYS[1])
if not lockowner then
redis.call('SET', KEYS[1], ARGV[1])
end
if not lockowner or lockowner == ARGV[1] then
redis.call('PEXPIRE', KEYS[1], 10000)
local pending = redis.call('XAUTOCLAIM', KEYS[2], 'grp', 'consumer', 1000, '0-0')
if #pending[2] > 0 then
return pending[2]
end
local result = redis.call('XREADGROUP', 'GROUP', 'grp', 'consumer', 'COUNT', 100, 'STREAMS', KEYS[2], '>')
if result then
return result[1][2]
end
return {}
end
return nil
`,
});
async function mainloop() {
while(status == 'running') {
let topics = await redis.smembers('streams');
let connections = await redis.smembers('connections');
for (let i=0; i<topics.length; i++) {
let topic = topics[i];
let result = await redis.atomicread('consumer:{'+ topic +'}', 'stream:{'+ topic +'}', processId);
if (result == undefined) {
console.log("another process is active on topic", topic, ", backing off");
continue;
}
let acks = [];
for (let n=0; n<result.length; n++) {
// FILTER HERE
for (let j=0; j<connections.length; j++) {
let params = {
ConnectionId: connections[j],
Data: JSON.stringify(result[n][1])
};
//await apigwManagementApi.postToConnection(params).promise();
console.log("writing data", params);
}
acks.push(result[n][0]);
}
if (acks.length > 0 ) {
console.log("acking", acks.length, "messages on topic", topic);
await redis.xack('stream:{'+ topic +'}', 'grp', ...acks);
}
}
await sleep(50);
}
await redis.disconnect();
}
mainloop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment