Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@chalasr
Forked from chanks/script.lua
Created July 7, 2019 19:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save chalasr/22c1ac3bfe6eb02b163788f568d6bad6 to your computer and use it in GitHub Desktop.
Save chalasr/22c1ac3bfe6eb02b163788f568d6bad6 to your computer and use it in GitHub Desktop.
Lua script to delete/trim all processed messages from a Redis stream
-- The goal of this script is to trim messages that have been processed by
-- all extant groups from the a given Redis stream. It returns the number
-- of messages that were deleted from the stream, if any. I make no
-- guarantees about its performance, particularly if the stream is large
-- and not fully processed (so a simple XTRIM isn't possible).
-- First off, bail out early if the stream doesn't exist.
if redis.call("EXISTS", KEYS[1]) == 0 then
return false
end
-- To figure out what messages are deletable, we fetch the "last-
-- delivered-id" for each consumer group of the stream, and set the lowest
-- one of those ids as our upper bound. Next, we scan the pending lists
-- for each group, because we also don't want to delete any events that
-- are delivered but not acknowledged. The lowest unacknowledged id (if
-- any exists) then becomes our new upper bound.
-- "last-delivered-id" isn't mentioned in the Redis docs, for some reason,
-- but it's in there, as of 5.0.3.
-- In the (common?) case where there are no pending messages, and the
-- lowest last-delivered-id equals the most recent id on the stream, we
-- can just do a simpler (and much more efficient) XTRIM stream MAXLEN 0.
-- If we can't do that, we'll have to pull in all the message ids before
-- the lowest unacknowledged id, and XDEL them all.
-- First, use XINFO GROUPS to get all group names and the most recently
-- distributed ids.
local xinfo_groups = redis.call("XINFO", "GROUPS", KEYS[1])
local last_delivered_ids = {}
local groups = {}
for _, group_info_array in ipairs(xinfo_groups) do
-- Redis passes us a flattened array of key, value pairs, so before
-- anything else, convert it into a proper hash-style table so that it's
-- easier to use.
local group_info = {}
for i = 1, #group_info_array, 2 do
group_info[group_info_array[i]] = group_info_array[i+1]
end
table.insert(groups, group_info["name"])
table.insert(last_delivered_ids, group_info["last-delivered-id"])
end
local lowest_pending_ids = {}
for _, group_name in ipairs(groups) do
local pending = redis.call("XPENDING", KEYS[1], group_name)
local lowest_id = pending[2]
if not lowest_id == false then
table.insert(lowest_pending_ids, lowest_id)
end
end
local function string_id_to_table(s)
local t = {}
for k, v in string.gmatch(s, "(%d+)-(%d+)") do
table.insert(t, tonumber(k))
table.insert(t, tonumber(v))
end
return t
end
-- Returns true if a < b, or if a == b (which is important later).
local function compare_ids(a, b)
local a_t = string_id_to_table(a)
local b_t = string_id_to_table(b)
return ((a_t[1] <= b_t[1]) and (a_t[2] <= b_t[2]))
end
table.sort(last_delivered_ids, compare_ids)
table.sort(lowest_pending_ids, compare_ids)
-- Here's our XTRIM optimization.
if #lowest_pending_ids == 0 then
local stream_info_array = redis.call("XINFO", "STREAM", KEYS[1])
local stream_info = {}
for i = 1, #stream_info_array, 2 do
stream_info[stream_info_array[i]] = stream_info_array[i+1]
end
if last_delivered_ids[1] == stream_info["last-generated-id"] then
-- Yay!
return redis.call("XTRIM", KEYS[1], "MAXLEN", 0)
end
end
-- If we've gotten here, looks like we need to do a big XDEL, so find our
-- lower bound.
local lowest_id = last_delivered_ids[1]
-- We can include the lowest delivered id in the deletion, so long as it
-- isn't pending, which we'll check for next.
local protect_lowest_id = false
if #lowest_pending_ids > 0 then
-- We rely here on compare_ids returning true if the ids are equal.
if compare_ids(lowest_pending_ids[1], lowest_id) then
lowest_id = lowest_pending_ids[1]
protect_lowest_id = true
end
end
local messages = redis.call("XRANGE", KEYS[1], "-", lowest_id)
if #messages == 0 then
-- Nothing to delete.
return 0
end
local delete_command = {"XDEL", KEYS[1]}
for _,t in pairs(messages) do
local id = t[1]
if (lowest_id ~= id) or (not protect_lowest_id) then
table.insert(delete_command, id)
end
end
return redis.call(unpack(delete_command))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment