Skip to content

Instantly share code, notes, and snippets.

@drslump
Last active January 11, 2024 10:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save drslump/785982191d5c67af7d58d2460299dffc to your computer and use it in GitHub Desktop.
Save drslump/785982191d5c67af7d58d2460299dffc to your computer and use it in GitHub Desktop.
Lua (teal) snippet to implement ordered processing for redis streams
--- Redis Stream with ordered processing
---
--- Redis streams support multiple consumers for throughput and reliability however it doesn't offer
--- a native mechanism for processing a set of messages in a specific order.
---
--- It's possible though to implement a simple algorithm that resembles Kafka's partitioning algorithm:
---
--- - a logical stream becomes N streams in redis (shards from now on)
--- - producers place the message into one of the stream shards based on the hashing of some attribute
--- - each consumer in the consumer group "acquires" a subset of the streams
--- - as long as a consumer keeps "acquiring" in a timely fashion, no consumer can process its shards
--- - since all the processing for a specific shard happens from a single consumer, we can guarantee
--- the ordered processing of messages matching a specific attribute (partition)
---
--- The acquire logic includes a self-balancing algorithm that will check how many consumers are using
--- it and re-assign shards accordingly. This allows the system to recover from orphaned shards (i.e.
--- consumer crashed), another consumer will pick up the shard when the lease from the previous one
--- times out.
Edited 1 month ago
--- Emulate a set by populating a table with the values as keys
local function Set(list)
local set = {}
for _, l in ipairs(list) do set[l] = true end
return set
end
--- Acquires a set of stream shards (keys) for a specific consumer.
--
-- Note that the logic derives some keys dynamically so it might break
-- when running in a cluster, to work around it use a "hash tag" in the
-- base key (i.e. `{my-cool-stream}`), all the derived keys will use it
-- as their prefix.
local function acquireStreamShards(
kBase,
totalShards,
consumerId,
expirationSeconds
)
local now = redis.call('TIME')[1]
local kConsumers = kBase .. ':active-consumers'
local kLeasedShards = kBase .. ':leased-shards:' .. consumerId
-- Add the consumer to the active set
redis.call('ZADD', kConsumers, now, consumerId)
redis.call('EXPIRE', kConsumers, expirationSeconds)
-- Obtain all the active consumers
local activeConsumers = redis.call('ZRANGE', kConsumers, now - expirationSeconds, '+inf', 'BYSCORE')
-- This consumer might already have a set of leased shards from a previous call
local leasedShards = redis.call('SMEMBERS', kLeasedShards)
redis.call('EXPIRE', kLeasedShards, expirationSeconds)
-- Calculate how many shards should be handled by each consumer
local shardsPerConsumer = totalShards / #activeConsumers
local delta = shardsPerConsumer - #leasedShards
-- When the consumer is unbalanced because it holds too many shards
-- slowly balance it by dropping one of them at a time.
if math.ceil(delta) < 0 then
local popped = table.remove(leasedShards)
redis.call('SREM', kLeasedShards, popped)
-- When the unbalance is due to the consumer needing more shards then
-- look for orphaned shards to restore the balance.
elseif math.floor(delta) > 0 then
-- Compute the union for the leased shards among all the active consumers
local listConsumerLeasedKeys = {}
for i, value in ipairs(activeConsumers) do
listConsumerLeasedKeys[i] = kBase .. ':leased:' .. value
end
local allLeasedShards = redis.call('SUNION', table.unpack(listConsumerLeasedKeys))
local allLeasedShardsSet = Set(allLeasedShards)
-- Iterate over the whole set of shards adding those that have not yet
-- been leased to any consumer.
for i = 0, totalShards - 1 do
local istr = tostring(i)
if not allLeasedShardsSet[istr] then
table.insert(leasedShards, istr)
redis.call('SADD', kLeasedShards, istr)
delta = delta - 1
if delta < 1 then break end
end
end
end
local result = {}
for _, value in ipairs(leasedShards) do
table.insert(result, kBase .. ':shards:' .. value)
end
return result
end
--- Immediately releases any shards associated to a consumer.
local function releaseStreamShards(kBase, consumerId)
redis.call('ZREM', kBase .. ':active-consumers', consumerId)
redis.call('DEL', kBase .. ':leased-shards:' .. consumerId)
end
--- Adds a new message to a stream shard.
-- The varargs match the XADD operation after the key parameter.
local function addToStreamShard(kBase, shard, ...)
local kShard = kBase .. '":shards:' .. shard
return redis.call('XADD', kShard, ...)
end
global record redis
call: function(command: string, ...: string | number): any
end
global record bit
band: function(lhs: number, rhs: number): number
lshift: function(lhs: number, rhs: number): number
end
local function Set<T>(list: {T}): {T: boolean}
local set = {}
for _, l in ipairs(list) do set[l] = true end
return set
end
--- Acquires a set of stream keys for a specific consumer.
--
-- Note that the logic derives some keys dynamically so it might break
-- when running in a cluster, to work around it use a "hash tag" in the
-- base key (i.e. `{my-cool-stream}`), all the derived keys will use it
-- as their prefix.
local function acquireStreamKeys(
kBase: string,
consumerId: string,
totalKeys: number,
expirationSeconds: number
): {string}
local now = (redis.call('TIME') as {number,number})[1]
local kConsumers = kBase
-- Note: Derived key so not cluster-compatible
local kLeasedKeys = kBase .. ':leased:' .. consumerId
-- Add the consumer to the active set
redis.call('ZADD', kConsumers, now, consumerId)
redis.call('EXPIRE', kConsumers, expirationSeconds)
-- Obtain all the active consumers
local activeConsumers = redis.call('ZRANGEBYSCORE', kConsumers, now - expirationSeconds, '+inf') as {string}
-- This consumer might already have a set of leased keys from a previous call
local leasedKeys = redis.call('SMEMBERS', kLeasedKeys) as {string}
-- Calculate how many keys should be handled by each balanced consumer
local keysPerConsumer = totalKeys / #activeConsumers
local delta = keysPerConsumer - #leasedKeys
-- When the consumer is unbalanced because it holds too many keys
-- slowly balance it by dropping one of them at a time.
if math.ceil(delta) < 0 then
local popped = table.remove(leasedKeys)
redis.call('SREM', kLeasedKeys, popped)
-- When the unbalance is due to the consumer needing more keys then
-- look for orphaned keys to restore the balance.
elseif math.floor(delta) > 0 then
-- Compute the union for the leased keys among all the active consumers
local listConsumerLeasedKeys = {}
for i, value in ipairs(activeConsumers) do
listConsumerLeasedKeys[i] = kBase .. ':leased:' .. value
end
local allLeasedKeys = redis.call('SUNION', table.unpack(listConsumerLeasedKeys)) as {string}
local allLeasedKeysSet = Set(allLeasedKeys)
-- Iterate over the whole set of keys adding those that have not yet
-- been leased to any consumer.
for i = 0, totalKeys - 1 do
local istr = tostring(i)
if not allLeasedKeysSet[istr] then
table.insert(leasedKeys, istr)
redis.call('SADD', kLeasedKeys, istr)
delta = delta - 1
if delta < 1 then break end
end
end
end
-- reset the expiration of the leased keys for this consumer
redis.call('EXPIRE', kLeasedKeys, expirationSeconds)
local result: {string} = {}
for _, value in ipairs(leasedKeys) do
table.insert(result, kBase .. ':' .. value)
end
return result
end
--- Immediately releases any keys associated to a consumer.
local function releaseStreamKeys(consumerId: string, keysPrefix: string): nil
redis.call('ZREM', keysPrefix .. ':consumers', consumerId)
redis.call('DEL', keysPrefix .. ':leased:' .. consumerId)
end
-- Same algorithm but using bitmaps instead of sets for lower memory usage
local function acquireStreamKeysBitmaps(
kBase: string,
consumerId: string,
totalKeys: number,
expirationSeconds: number
): {string}
local now = (redis.call('TIME') as {number,number})[1]
local kConsumers = kBase
local kLeasedKeys = kBase .. ':leased:' .. consumerId
-- Add the consumer to the active set
redis.call('ZADD', kConsumers, now, consumerId)
redis.call('EXPIRE', kConsumers, expirationSeconds)
-- Obtain all the active consumers
local activeConsumers = redis.call('ZRANGEBYSCORE', kConsumers, now - expirationSeconds, '+inf') as {string}
-- This consumer might already have a set of leased keys from a previous call
local countLeasedKeys = redis.call('BITCOUNT', kLeasedKeys) as number
-- Calculate how many keys should be handled by each balanced consumer
local keysPerConsumer = totalKeys / #activeConsumers
local delta = keysPerConsumer - countLeasedKeys
-- When the consumer is unbalanced because it holds too many keys
-- slowly balance it by dropping one of them at a time.
if math.ceil(delta) < 0 then
local offset = redis.call('BITPOS', kLeasedKeys, 1) as number
if offset >= 0 then
redis.call('SETBIT', kLeasedKeys, offset, 0)
end
-- When the unbalance is due to the consumer needing more keys then
-- look for orphaned keys to restore the balance.
elseif math.floor(delta) > 0 then
-- Compute the union for the leased keys among all the active consumers
local listConsumerLeasedKeys = {}
for i, value in ipairs(activeConsumers) do
listConsumerLeasedKeys[i] = kBase .. ':leased:' .. value
end
local kTempBits = kBase .. ':' .. 'tmp-bits'
redis.call('BITOP', 'OR', kTempBits, table.unpack(listConsumerLeasedKeys))
while delta >= 1 do
local orphanKey = redis.call('BITPOS', kTempBits, 0) as number
if orphanKey >= totalKeys then
break
end
redis.call('SETBIT', orphanKey, 1)
delta = delta - 1
end
redis.call('DEL', kTempBits)
end
-- Finally get the bitmap and reset the expiration for this consumer
local leasedKeys = redis.call('GETEX', kLeasedKeys, 'EX', expirationSeconds) as string
-- Iterate over the bitmap to construct an array of keys
local band, lshift = bit.band, bit.lshift
local result: {string} = {}
local offset = 0
for i = 1, #leasedKeys do
local byte = string.byte(leasedKeys, i)
for j = 7, 0, -1 do
if band(byte, lshift(1, j)) > 0 then
table.insert(result, kBase .. ':' .. offset)
end
offset = offset + 1
end
end
return result
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment