Last active
January 11, 2024 10:13
-
-
Save drslump/785982191d5c67af7d58d2460299dffc to your computer and use it in GitHub Desktop.
Lua (teal) snippet to implement ordered processing for redis streams
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--- Redis Stream with ordered processing | |
--- | |
--- Redis streams support multiple consumers for throughput and reliability however it doesn't offer | |
--- a native mechanism for processing a set of messages in a specific order. | |
--- | |
--- It's possible though to implement a simple algorithm that resembles Kafka's partitioning algorithm: | |
--- | |
--- - a logical stream becomes N streams in redis (shards from now on) | |
--- - producers place the message into one of the stream shards based on the hashing of some attribute | |
--- - each consumer in the consumer group "acquires" a subset of the streams | |
--- - as long as a consumer keeps "acquiring" in a timely fashion, no consumer can process its shards | |
--- - since all the processing for a specific shard happens from a single consumer, we can guarantee | |
--- the ordered processing of messages matching a specific attribute (partition) | |
--- | |
--- The acquire logic includes a self-balancing algorithm that will check how many consumers are using | |
--- it and re-assign shards accordingly. This allows the system to recover from orphaned shards (i.e. | |
--- consumer crashed), another consumer will pick up the shard when the lease from the previous one | |
--- times out. | |
Edited 1 month ago | |
--- Emulate a set by populating a table with the values as keys | |
local function Set(list) | |
local set = {} | |
for _, l in ipairs(list) do set[l] = true end | |
return set | |
end | |
--- Acquires a set of stream shards (keys) for a specific consumer. | |
-- | |
-- Note that the logic derives some keys dynamically so it might break | |
-- when running in a cluster, to work around it use a "hash tag" in the | |
-- base key (i.e. `{my-cool-stream}`), all the derived keys will use it | |
-- as their prefix. | |
local function acquireStreamShards( | |
kBase, | |
totalShards, | |
consumerId, | |
expirationSeconds | |
) | |
local now = redis.call('TIME')[1] | |
local kConsumers = kBase .. ':active-consumers' | |
local kLeasedShards = kBase .. ':leased-shards:' .. consumerId | |
-- Add the consumer to the active set | |
redis.call('ZADD', kConsumers, now, consumerId) | |
redis.call('EXPIRE', kConsumers, expirationSeconds) | |
-- Obtain all the active consumers | |
local activeConsumers = redis.call('ZRANGE', kConsumers, now - expirationSeconds, '+inf', 'BYSCORE') | |
-- This consumer might already have a set of leased shards from a previous call | |
local leasedShards = redis.call('SMEMBERS', kLeasedShards) | |
redis.call('EXPIRE', kLeasedShards, expirationSeconds) | |
-- Calculate how many shards should be handled by each consumer | |
local shardsPerConsumer = totalShards / #activeConsumers | |
local delta = shardsPerConsumer - #leasedShards | |
-- When the consumer is unbalanced because it holds too many shards | |
-- slowly balance it by dropping one of them at a time. | |
if math.ceil(delta) < 0 then | |
local popped = table.remove(leasedShards) | |
redis.call('SREM', kLeasedShards, popped) | |
-- When the unbalance is due to the consumer needing more shards then | |
-- look for orphaned shards to restore the balance. | |
elseif math.floor(delta) > 0 then | |
-- Compute the union for the leased shards among all the active consumers | |
local listConsumerLeasedKeys = {} | |
for i, value in ipairs(activeConsumers) do | |
listConsumerLeasedKeys[i] = kBase .. ':leased:' .. value | |
end | |
local allLeasedShards = redis.call('SUNION', table.unpack(listConsumerLeasedKeys)) | |
local allLeasedShardsSet = Set(allLeasedShards) | |
-- Iterate over the whole set of shards adding those that have not yet | |
-- been leased to any consumer. | |
for i = 0, totalShards - 1 do | |
local istr = tostring(i) | |
if not allLeasedShardsSet[istr] then | |
table.insert(leasedShards, istr) | |
redis.call('SADD', kLeasedShards, istr) | |
delta = delta - 1 | |
if delta < 1 then break end | |
end | |
end | |
end | |
local result = {} | |
for _, value in ipairs(leasedShards) do | |
table.insert(result, kBase .. ':shards:' .. value) | |
end | |
return result | |
end | |
--- Immediately releases any shards associated to a consumer. | |
local function releaseStreamShards(kBase, consumerId) | |
redis.call('ZREM', kBase .. ':active-consumers', consumerId) | |
redis.call('DEL', kBase .. ':leased-shards:' .. consumerId) | |
end | |
--- Adds a new message to a stream shard. | |
-- The varargs match the XADD operation after the key parameter. | |
local function addToStreamShard(kBase, shard, ...) | |
local kShard = kBase .. '":shards:' .. shard | |
return redis.call('XADD', kShard, ...) | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
global record redis | |
call: function(command: string, ...: string | number): any | |
end | |
global record bit | |
band: function(lhs: number, rhs: number): number | |
lshift: function(lhs: number, rhs: number): number | |
end | |
local function Set<T>(list: {T}): {T: boolean} | |
local set = {} | |
for _, l in ipairs(list) do set[l] = true end | |
return set | |
end | |
--- Acquires a set of stream keys for a specific consumer. | |
-- | |
-- Note that the logic derives some keys dynamically so it might break | |
-- when running in a cluster, to work around it use a "hash tag" in the | |
-- base key (i.e. `{my-cool-stream}`), all the derived keys will use it | |
-- as their prefix. | |
local function acquireStreamKeys( | |
kBase: string, | |
consumerId: string, | |
totalKeys: number, | |
expirationSeconds: number | |
): {string} | |
local now = (redis.call('TIME') as {number,number})[1] | |
local kConsumers = kBase | |
-- Note: Derived key so not cluster-compatible | |
local kLeasedKeys = kBase .. ':leased:' .. consumerId | |
-- Add the consumer to the active set | |
redis.call('ZADD', kConsumers, now, consumerId) | |
redis.call('EXPIRE', kConsumers, expirationSeconds) | |
-- Obtain all the active consumers | |
local activeConsumers = redis.call('ZRANGEBYSCORE', kConsumers, now - expirationSeconds, '+inf') as {string} | |
-- This consumer might already have a set of leased keys from a previous call | |
local leasedKeys = redis.call('SMEMBERS', kLeasedKeys) as {string} | |
-- Calculate how many keys should be handled by each balanced consumer | |
local keysPerConsumer = totalKeys / #activeConsumers | |
local delta = keysPerConsumer - #leasedKeys | |
-- When the consumer is unbalanced because it holds too many keys | |
-- slowly balance it by dropping one of them at a time. | |
if math.ceil(delta) < 0 then | |
local popped = table.remove(leasedKeys) | |
redis.call('SREM', kLeasedKeys, popped) | |
-- When the unbalance is due to the consumer needing more keys then | |
-- look for orphaned keys to restore the balance. | |
elseif math.floor(delta) > 0 then | |
-- Compute the union for the leased keys among all the active consumers | |
local listConsumerLeasedKeys = {} | |
for i, value in ipairs(activeConsumers) do | |
listConsumerLeasedKeys[i] = kBase .. ':leased:' .. value | |
end | |
local allLeasedKeys = redis.call('SUNION', table.unpack(listConsumerLeasedKeys)) as {string} | |
local allLeasedKeysSet = Set(allLeasedKeys) | |
-- Iterate over the whole set of keys adding those that have not yet | |
-- been leased to any consumer. | |
for i = 0, totalKeys - 1 do | |
local istr = tostring(i) | |
if not allLeasedKeysSet[istr] then | |
table.insert(leasedKeys, istr) | |
redis.call('SADD', kLeasedKeys, istr) | |
delta = delta - 1 | |
if delta < 1 then break end | |
end | |
end | |
end | |
-- reset the expiration of the leased keys for this consumer | |
redis.call('EXPIRE', kLeasedKeys, expirationSeconds) | |
local result: {string} = {} | |
for _, value in ipairs(leasedKeys) do | |
table.insert(result, kBase .. ':' .. value) | |
end | |
return result | |
end | |
--- Immediately releases any keys associated to a consumer. | |
local function releaseStreamKeys(consumerId: string, keysPrefix: string): nil | |
redis.call('ZREM', keysPrefix .. ':consumers', consumerId) | |
redis.call('DEL', keysPrefix .. ':leased:' .. consumerId) | |
end | |
-- Same algorithm but using bitmaps instead of sets for lower memory usage | |
local function acquireStreamKeysBitmaps( | |
kBase: string, | |
consumerId: string, | |
totalKeys: number, | |
expirationSeconds: number | |
): {string} | |
local now = (redis.call('TIME') as {number,number})[1] | |
local kConsumers = kBase | |
local kLeasedKeys = kBase .. ':leased:' .. consumerId | |
-- Add the consumer to the active set | |
redis.call('ZADD', kConsumers, now, consumerId) | |
redis.call('EXPIRE', kConsumers, expirationSeconds) | |
-- Obtain all the active consumers | |
local activeConsumers = redis.call('ZRANGEBYSCORE', kConsumers, now - expirationSeconds, '+inf') as {string} | |
-- This consumer might already have a set of leased keys from a previous call | |
local countLeasedKeys = redis.call('BITCOUNT', kLeasedKeys) as number | |
-- Calculate how many keys should be handled by each balanced consumer | |
local keysPerConsumer = totalKeys / #activeConsumers | |
local delta = keysPerConsumer - countLeasedKeys | |
-- When the consumer is unbalanced because it holds too many keys | |
-- slowly balance it by dropping one of them at a time. | |
if math.ceil(delta) < 0 then | |
local offset = redis.call('BITPOS', kLeasedKeys, 1) as number | |
if offset >= 0 then | |
redis.call('SETBIT', kLeasedKeys, offset, 0) | |
end | |
-- When the unbalance is due to the consumer needing more keys then | |
-- look for orphaned keys to restore the balance. | |
elseif math.floor(delta) > 0 then | |
-- Compute the union for the leased keys among all the active consumers | |
local listConsumerLeasedKeys = {} | |
for i, value in ipairs(activeConsumers) do | |
listConsumerLeasedKeys[i] = kBase .. ':leased:' .. value | |
end | |
local kTempBits = kBase .. ':' .. 'tmp-bits' | |
redis.call('BITOP', 'OR', kTempBits, table.unpack(listConsumerLeasedKeys)) | |
while delta >= 1 do | |
local orphanKey = redis.call('BITPOS', kTempBits, 0) as number | |
if orphanKey >= totalKeys then | |
break | |
end | |
redis.call('SETBIT', orphanKey, 1) | |
delta = delta - 1 | |
end | |
redis.call('DEL', kTempBits) | |
end | |
-- Finally get the bitmap and reset the expiration for this consumer | |
local leasedKeys = redis.call('GETEX', kLeasedKeys, 'EX', expirationSeconds) as string | |
-- Iterate over the bitmap to construct an array of keys | |
local band, lshift = bit.band, bit.lshift | |
local result: {string} = {} | |
local offset = 0 | |
for i = 1, #leasedKeys do | |
local byte = string.byte(leasedKeys, i) | |
for j = 7, 0, -1 do | |
if band(byte, lshift(1, j)) > 0 then | |
table.insert(result, kBase .. ':' .. offset) | |
end | |
offset = offset + 1 | |
end | |
end | |
return result | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment