Skip to content

Instantly share code, notes, and snippets.

@ochaton
Created July 5, 2023 18:46
Show Gist options
  • Save ochaton/67a1778e289ef69b34b733f3ec8cdd8e to your computer and use it in GitHub Desktop.
Save ochaton/67a1778e289ef69b34b733f3ec8cdd8e to your computer and use it in GitHub Desktop.
Index scanner for Tarantool
local msgpack = require 'msgpack'
local base64_encode = require 'digest'.base64_encode
local base64_decode = require 'digest'.base64_decode
---@alias scan.cursor string
---Encodes cursor to string
---@param index boxIndex
---@param tuple box.tuple
---@return string
local function encode_cursor(index, tuple)
local r = {}
local pos = 1
for _, p in ipairs(index.parts) do
r[pos] = tuple[p.fieldno]
pos = pos + 1
end
return base64_encode(msgpack.encode(r), {nopad = true, nowrap = true})
end
---Decodes cursor to tuple
---@param cursor string
---@return box.tuple
local function decode_cursor(cursor)
return (msgpack.decode(base64_decode(cursor)))
end
local req_ok, key_def = pcall(require, 'key_def')
if not req_ok then
-- cheap and nasty key_def
-- for backwords compatibility supported only method :compare_with_key
key_def = {}
---@param tuple box.tuple
---@param key_tuple box.tuple
---@return integer
function key_def:compare_with_key(tuple, key_tuple)
for i = 1, #self.parts do
-- TODO: nullable fields?
local value = tuple[self.parts[i].fieldno]
if not key_tuple[i] then
break
end
if value < key_tuple[i] then
return -1
elseif value > key_tuple[i] then
return 1
end
end
return 0
end
---@param parts IndexPart[]
---@return key_def_object
function key_def.new(parts)
local self = { parts = {} }
for i, part in ipairs(parts) do
self.parts[i] = {}
self.parts[i].fieldno = part.fieldno
self.parts[i].type = part.type
self.parts[i].is_nullable = part.is_nullable
end
return setmetatable(self, {__index = key_def})
end
end
---@class scan.args
---@field index boxIndex
---@field limit integer required numeric limit
---@field cursor scan.cursor? optional string
---@field prefix_eq any[]? optional prefix equality
---@field on_each fun(box.tuple): any? optional callback which is called on each resulting tuple
---@class scan.result
---@field result box.tuple[] result
---@field truncated boolean true if result is truncated
---@field next_cursor scan.cursor? empty if not truncated
---@param args scan.args
---@return scan.result
return function(args)
local index = args.index
if not index then
error("Scan: field 'index' not given", 2)
end
if not index.unique then
error("Scan: non-unique indexes are not supported", 2)
end
local iter_type = 'GE'
local iter_opts = { iterator = iter_type }
local cursor = args.cursor
local start_from
if index.tuple_pos then
-- Tarantool ≥ 2.11
start_from = args.prefix_eq
iter_opts.after = cursor
else
-- Tarantool ≤ 2.11
if type(cursor) == 'string' then
local ok, res = pcall(decode_cursor, cursor)
if not ok then
error("Scan: malformed cursor is given", 2)
end
start_from = res
elseif cursor ~= nil then
error(("Scan: malformed cursor is given (required string, got %s)"):format(type(cursor)), 2)
else
start_from = args.prefix_eq
end
end
local prefix_eq, kd
if args.prefix_eq then
prefix_eq = args.prefix_eq
kd = key_def.new(index.parts)
end
local limit = tonumber(args.limit)
if not limit then
limit = 100
end
limit = math.floor(limit)
if limit > 100 then
error("Scan: limit is too large (must be ≤100)", 2)
elseif limit < 1 then
error("Scan: limit is too low (must be ≥ 1)", 2)
end
limit = limit + 1
local on_each = args.on_each
local result = table.new(limit, 0)
local res_n = 1
local next_cursor, last_tuple
for _, tuple in index:pairs(start_from, iter_opts) do
limit = limit - 1
-- if prefix_eq is given then
-- we break iteration when prefix_eq is not compared
if kd and kd:compare_with_key(tuple, prefix_eq) ~= 0 then
break
end
if limit == 0 then
next_cursor = tuple
break
end
last_tuple = tuple
if on_each then
result[res_n] = on_each(tuple)
else
result[res_n] = tuple
end
res_n = res_n + 1
end
if next_cursor then
if index.tuple_pos then
-- Yes, in tarantool's after is strict >
-- so we need to get tuple_pos of last_tuple answered to client
next_cursor = index:tuple_pos(last_tuple)
else
next_cursor = encode_cursor(index, next_cursor)
end
end
return {
count = res_n,
result = result,
truncated = next_cursor ~= nil,
next_cursor = next_cursor,
}
end
--[[
Example:
local scanner = require 'scan'
-- Assume we have such space:
box.schema.space.create('payments', {
format = {
{ name = 'payment_id', type = 'uuid' },
{ name = 'user', type = 'unsigned' },
{ name = 'timestamp', type = 'unsigned' },
{ name = 'amount', type = 'decimal' },
},
})
-- And following indexes:
box.space.payments:create_index('primary', { parts = {'payment_id'} })
box.space.payments:create_index('by_user', { parts = {'user', 'payment_id'} })
box.space.payments:create_index('by_user_timestamp', { parts = {'user', 'timestamp' 'payment_id'} })
-- And we want to fetch all payments for specified user ordered by date in Ascending Order
-- Then we use scanner like this
local records = scanner {
index = box.space.payments.index.by_user_timestamp,
prefix_eq = { 12312313 }, -- `12312313` is user_id
}
-- records will have following fields:
records = {
result = { ... }, -- all fetched records with user == `12312313` (this is prefix of our index)
count = 100, -- number of records were scanned to build result
truncated = true|false, -- boolean flag which is true when there are some records left to fetch for this user
next_cursor = ".....", -- cursor which points to next valid record, client may pass it to get next portion of data
}
-- After that client ma fetch next portion of data:
local next_records = scanner {
index = box.space.payments.index.by_user_timestamp, -- same as previous
prefix_eq = { 12312313 }, -- `12312313` is user_id -- same as previous
cursor = "...." -- cursor which was retrieved from previous request (field `next_cursor`)
}
-- Size of `result` is limited by variable limit (default=100, and cannot be increased)
-- User may decrease limit passing needed value:
local records = scanner {
index = box.space.payments.index.by_user_timestamp,
prefix_eq = { 12312313 }, -- `12312313` is user_id
limit = 50,
}
-- limit must be integer and in between interval [1, 100]
-- User may pass on_each callback to make some preprocessing
-- Then returned value from `on_each` callback will be returned
local records = scanner {
index = box.space.payments.index.by_user_timestamp,
prefix_eq = { 12312313 }, -- `12312313` is user_id
-- Casts box.tuple to kv-map (for java/golang and others)
on_each = function(tuple)
return tuple:tomap{ names_only = true }
end
}
]]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment