Created
August 7, 2019 22:38
-
-
Save jakelowen/22cb8a233ac0cdbb8e77808e17e0e1fc to your computer and use it in GitHub Desktop.
NodeJS stream trim concept for consumer groups
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env node | |
var Redis = require("ioredis"); | |
const _ = require("lodash"); | |
var redis = new Redis(); | |
// quick utillity to split arrays into keyed objects | |
const redisArrayDataSplitter = data => { | |
return _.map(data, x => { | |
const pairs = _.chunk(x, 2); | |
// data object | |
const rData = _.fromPairs(pairs); | |
return rData; | |
}); | |
}; | |
// parses xinfo STREAM for meta information about stream into | |
// nice consumable object | |
const getStreamInfo = async streamKey => { | |
let info = await redis.xinfo("STREAM", streamKey); | |
// return info; | |
info = redisArrayDataSplitter([info])[0]; | |
info["first-entry"] = _.keys( | |
redisArrayDataSplitter([info["first-entry"]])[0] | |
)[0]; | |
info["last-entry"] = _.keys( | |
redisArrayDataSplitter([info["last-entry"]])[0] | |
)[0]; | |
return info; | |
}; | |
// parses xinfo GROUPS to get consumer groups report | |
const getActiveConsumerGroups = async streamKey => { | |
const groups = await redis.xinfo("GROUPS", streamKey); | |
return redisArrayDataSplitter(groups); | |
}; | |
// parses xinfo CONSUMERS call to get report on each consumer | |
// in the consumer group | |
const getConsumerGroupInfo = async (streamKey, consumerGroup) => { | |
const info = await redis.xinfo("CONSUMERS", streamKey, consumerGroup); | |
return redisArrayDataSplitter(info); | |
}; | |
// grabs server time | |
const getRedisTime = async () => { | |
const call = await redis.time(); | |
return call[0]; | |
}; | |
// returns the oldest pending message for a given consumer | |
// within a given consumer group | |
const getOldestPendingMessagesForConsumer = async ( | |
streamKey, | |
consumerGroup, | |
consumer | |
) => { | |
const info = await redis.xreadgroup( | |
"GROUP", | |
consumerGroup, | |
consumer, | |
"COUNT", | |
1, | |
"STREAMS", | |
streamKey, | |
0 | |
); | |
const newest = redisArrayDataSplitter(info)[0][streamKey]; | |
return (newest && newest[0] && newest[0][0]) || null; | |
}; | |
// combines all above helper utilities into one master report obj | |
// Example: | |
// { | |
// "reportTimeSeconds": "1565216593", | |
// "streamKey": "STREAM:::myStream", | |
// "streamInfo": { | |
// "length": 7, | |
// "radix-tree-keys": 1, | |
// "radix-tree-nodes": 2, | |
// "groups": 2, | |
// "last-generated-id": "1565127805327-0", | |
// "first-entry": "1565118841164-0", | |
// "last-entry": "1565127805327-0", | |
// "consumerGroups": [ | |
// { | |
// "name": "alpha-processors", | |
// "consumers": [ | |
// { | |
// "name": "CONSUMER:::alphaproc1", | |
// "pending": 0, | |
// "idle": 27467, | |
// "oldestPending": null | |
// }, | |
// { | |
// "name": "alphaproc1", | |
// "pending": 1, | |
// "idle": 27467, | |
// "oldestPending": "1565118841164-0" | |
// } | |
// ], | |
// "pending": 1, | |
// "last-delivered-id": "1565119148707-0" | |
// }, | |
// { | |
// "name": "testcg", | |
// "consumers": [ | |
// { | |
// "name": "CONSUMER:::test", | |
// "pending": 0, | |
// "idle": 27467, | |
// "oldestPending": null | |
// } | |
// ], | |
// "pending": 0, | |
// "last-delivered-id": "1565127805327-0" | |
// } | |
// ] | |
// } | |
// } | |
const generateStreamReport = async streamKey => { | |
const reportTimeSeconds = await getRedisTime(); | |
const streamInfo = await getStreamInfo(streamKey); | |
streamInfo.consumerGroups = await getActiveConsumerGroups(streamKey); | |
for (const cgIdx in streamInfo.consumerGroups) { | |
// console.log("!!!!!!!", streamInfo.consumerGroups[cgIdx].name); | |
streamInfo.consumerGroups[cgIdx].consumers = await getConsumerGroupInfo( | |
streamKey, | |
streamInfo.consumerGroups[cgIdx].name | |
); | |
for (const cIdx in streamInfo.consumerGroups[cgIdx].consumers) { | |
streamInfo.consumerGroups[cgIdx].consumers[ | |
cIdx | |
].oldestPending = await getOldestPendingMessagesForConsumer( | |
streamKey, | |
streamInfo.consumerGroups[cgIdx].name, | |
streamInfo.consumerGroups[cgIdx].consumers[cIdx].name | |
); | |
} | |
} | |
return { | |
reportTimeSeconds, | |
streamKey, | |
streamInfo | |
}; | |
}; | |
// computes an array of all given keys either from | |
// consumer group "last delivered info" OR | |
// from the oldest pending message from each consumer in group | |
// returns the oldest found key. | |
const extractOldestKeyFromReport = report => { | |
const timeStamps = []; | |
_.forEach(report.streamInfo.consumerGroups, cg => { | |
timeStamps.push(cg["last-delivered-id"]); | |
_.forEach(cg.consumers, c => { | |
if (c.oldestPending) { | |
timeStamps.push(c.oldestPending); | |
} | |
}); | |
}); | |
return _.min(timeStamps); | |
}; | |
// given a key, finds the index of that key within the stream | |
const findCriticalKeyIdx = async ( | |
streamKey, | |
criticalKey, | |
iterationSize = 100 | |
) => { | |
let criticalKeyIdx = 0; | |
let more = true; | |
while (more) { | |
const history = await redis.xrange( | |
streamKey, | |
"-", | |
criticalKey, | |
"count", | |
iterationSize | |
); | |
if (history.length < iterationSize) { | |
more = false; | |
const historyIdx = _.findIndex(history, h => { | |
return h[0] === criticalKey; | |
}); | |
if (historyIdx !== -1) { | |
criticalKeyIdx = historyIdx; | |
} | |
} else { | |
criticalKeyIdx += iterationSize; | |
} | |
} | |
return criticalKeyIdx; | |
}; | |
// given the index of a critical key, | |
// compares to length of entire stream and returns | |
// a "safe" length to which to feed to xtrim ~ | |
// can provide a buffer length to pad in case of | |
// rapidly growing streams | |
const calcNumSafeTrimLength = async (streamKey, criticalKeyIdx, buffer = 0) => { | |
const streamLength = await redis.xlen(streamKey); | |
if (criticalKeyIdx <= 0) { | |
return streamLength + buffer; | |
} | |
const remainingCount = streamLength - criticalKeyIdx; | |
console.log({ remainingCount }); | |
return remainingCount + buffer; | |
}; | |
// apply the trim | |
const trimStream = async (streamKey, length) => { | |
await redis.xtrim(streamKey, "MAXLEN", "~", length); | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
just wondering how do I define streamKey?
is it as easy as var streamKey = 'whatever'?
and how to get the ball rolling?