Skip to content

Instantly share code, notes, and snippets.

@jakelowen
Created August 7, 2019 22:38
Show Gist options
  • Save jakelowen/22cb8a233ac0cdbb8e77808e17e0e1fc to your computer and use it in GitHub Desktop.
Save jakelowen/22cb8a233ac0cdbb8e77808e17e0e1fc to your computer and use it in GitHub Desktop.
NodeJS stream trim concept for consumer groups
#!/usr/bin/env node
var Redis = require("ioredis");
const _ = require("lodash");
var redis = new Redis();
// quick utillity to split arrays into keyed objects
const redisArrayDataSplitter = data => {
return _.map(data, x => {
const pairs = _.chunk(x, 2);
// data object
const rData = _.fromPairs(pairs);
return rData;
});
};
// parses xinfo STREAM for meta information about stream into
// nice consumable object
const getStreamInfo = async streamKey => {
let info = await redis.xinfo("STREAM", streamKey);
// return info;
info = redisArrayDataSplitter([info])[0];
info["first-entry"] = _.keys(
redisArrayDataSplitter([info["first-entry"]])[0]
)[0];
info["last-entry"] = _.keys(
redisArrayDataSplitter([info["last-entry"]])[0]
)[0];
return info;
};
// parses xinfo GROUPS to get consumer groups report
const getActiveConsumerGroups = async streamKey => {
const groups = await redis.xinfo("GROUPS", streamKey);
return redisArrayDataSplitter(groups);
};
// parses xinfo CONSUMERS call to get report on each consumer
// in the consumer group
const getConsumerGroupInfo = async (streamKey, consumerGroup) => {
const info = await redis.xinfo("CONSUMERS", streamKey, consumerGroup);
return redisArrayDataSplitter(info);
};
// grabs server time
const getRedisTime = async () => {
const call = await redis.time();
return call[0];
};
// returns the oldest pending message for a given consumer
// within a given consumer group
const getOldestPendingMessagesForConsumer = async (
streamKey,
consumerGroup,
consumer
) => {
const info = await redis.xreadgroup(
"GROUP",
consumerGroup,
consumer,
"COUNT",
1,
"STREAMS",
streamKey,
0
);
const newest = redisArrayDataSplitter(info)[0][streamKey];
return (newest && newest[0] && newest[0][0]) || null;
};
// combines all above helper utilities into one master report obj
// Example:
// {
// "reportTimeSeconds": "1565216593",
// "streamKey": "STREAM:::myStream",
// "streamInfo": {
// "length": 7,
// "radix-tree-keys": 1,
// "radix-tree-nodes": 2,
// "groups": 2,
// "last-generated-id": "1565127805327-0",
// "first-entry": "1565118841164-0",
// "last-entry": "1565127805327-0",
// "consumerGroups": [
// {
// "name": "alpha-processors",
// "consumers": [
// {
// "name": "CONSUMER:::alphaproc1",
// "pending": 0,
// "idle": 27467,
// "oldestPending": null
// },
// {
// "name": "alphaproc1",
// "pending": 1,
// "idle": 27467,
// "oldestPending": "1565118841164-0"
// }
// ],
// "pending": 1,
// "last-delivered-id": "1565119148707-0"
// },
// {
// "name": "testcg",
// "consumers": [
// {
// "name": "CONSUMER:::test",
// "pending": 0,
// "idle": 27467,
// "oldestPending": null
// }
// ],
// "pending": 0,
// "last-delivered-id": "1565127805327-0"
// }
// ]
// }
// }
const generateStreamReport = async streamKey => {
const reportTimeSeconds = await getRedisTime();
const streamInfo = await getStreamInfo(streamKey);
streamInfo.consumerGroups = await getActiveConsumerGroups(streamKey);
for (const cgIdx in streamInfo.consumerGroups) {
// console.log("!!!!!!!", streamInfo.consumerGroups[cgIdx].name);
streamInfo.consumerGroups[cgIdx].consumers = await getConsumerGroupInfo(
streamKey,
streamInfo.consumerGroups[cgIdx].name
);
for (const cIdx in streamInfo.consumerGroups[cgIdx].consumers) {
streamInfo.consumerGroups[cgIdx].consumers[
cIdx
].oldestPending = await getOldestPendingMessagesForConsumer(
streamKey,
streamInfo.consumerGroups[cgIdx].name,
streamInfo.consumerGroups[cgIdx].consumers[cIdx].name
);
}
}
return {
reportTimeSeconds,
streamKey,
streamInfo
};
};
// computes an array of all given keys either from
// consumer group "last delivered info" OR
// from the oldest pending message from each consumer in group
// returns the oldest found key.
const extractOldestKeyFromReport = report => {
const timeStamps = [];
_.forEach(report.streamInfo.consumerGroups, cg => {
timeStamps.push(cg["last-delivered-id"]);
_.forEach(cg.consumers, c => {
if (c.oldestPending) {
timeStamps.push(c.oldestPending);
}
});
});
return _.min(timeStamps);
};
// given a key, finds the index of that key within the stream
const findCriticalKeyIdx = async (
streamKey,
criticalKey,
iterationSize = 100
) => {
let criticalKeyIdx = 0;
let more = true;
while (more) {
const history = await redis.xrange(
streamKey,
"-",
criticalKey,
"count",
iterationSize
);
if (history.length < iterationSize) {
more = false;
const historyIdx = _.findIndex(history, h => {
return h[0] === criticalKey;
});
if (historyIdx !== -1) {
criticalKeyIdx = historyIdx;
}
} else {
criticalKeyIdx += iterationSize;
}
}
return criticalKeyIdx;
};
// given the index of a critical key,
// compares to length of entire stream and returns
// a "safe" length to which to feed to xtrim ~
// can provide a buffer length to pad in case of
// rapidly growing streams
const calcNumSafeTrimLength = async (streamKey, criticalKeyIdx, buffer = 0) => {
const streamLength = await redis.xlen(streamKey);
if (criticalKeyIdx <= 0) {
return streamLength + buffer;
}
const remainingCount = streamLength - criticalKeyIdx;
console.log({ remainingCount });
return remainingCount + buffer;
};
// apply the trim
const trimStream = async (streamKey, length) => {
await redis.xtrim(streamKey, "MAXLEN", "~", length);
};
@rvwhitney
Copy link

rvwhitney commented Feb 28, 2022

just wondering how do I define streamKey?
is it as easy as var streamKey = 'whatever'?
and how to get the ball rolling?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment