Skip to content

Instantly share code, notes, and snippets.

@ochaton
Created April 28, 2024 01:08
Show Gist options
  • Save ochaton/8892835557da5e8e6b19421bdc986763 to your computer and use it in GitHub Desktop.
Save ochaton/8892835557da5e8e6b19421bdc986763 to your computer and use it in GitHub Desktop.
PoC: cpu_limit method for luafun iterator inside Tarantool to allow automatic control of yields
--[[
cpu limit is like leaking bucket.
we say that we want to limit this iterator to consume ≤10% cpu.
Caller:
- ev(), real()
- gen_x(param_x, state_x)
- ev(), real()
-- We gain time to work when we sleep
-- We lose time of the work between subsequent calls
-- Given time quota for fiber is evaluated as % of max_ev_run_time
-- So basically, it is not possible for the fiber to be on-cpu more than max_ev_run_time (10*ms for now)
-- When fiber or ev_run exhausts time_quota (yes, noizy neighbours make fiber yield too)
-- fiber is sent to sleep for next time slot.
-- sleep time slot is evaluated in the following manner:
-- for each W time of work, fiber needs to be yielded for at least W / % to regain it's time-quota.
-- The same approach, but slightly in a different way is implemented here:
-- fiber should sleep at most `max_ev_run_time` (basically should skip next ev loop).
-- more precisely, fiber need to sleep (100% - X%) * max_ev_run_time where X% is cpu quota given to fiber
-- but, in some cases, it is sended to sleep because of noizy neighbours, so we subtract from sleep-time time_quota that left for fiber.
-- After each sleep time_quota increases for X% of the sleep but always limited to X% * max_ev_run_time.
]]
local fun = require 'fun'
local methods = debug.getmetatable(fun.ones()).__index
local rawiter = function(gen, param, state)
return fun.iter(gen, param, state):unwrap()
end
local method1 = function(func)
return function(self, arg1)
return func(arg1, self.gen, self.param, self.state)
end
end
local export1 = function(func)
return function(arg1, gen, param, state)
return func(arg1, rawiter(gen, param, state))
end
end
local fiber = require 'fiber'
local clock = require 'clock'
local log = require 'log'
if log.new then log = log.new('fun.cpu_limit') end
local function ev_time_mks() return fiber.time64() end
local function realtime_mks() return clock.realtime64()/1e3 end
local function thread_time_mks() return clock.thread64()/1e3 end
local function max(a, b)
if a > b then return a end
return b
end
local yield_sleep = function(time)
fiber.sleep(time)
end
local yield_commit = function(time)
local is_committed
if type(box.cfg) ~= 'function' and box.is_in_txn and box.is_in_txn() then
box.commit()
is_committed = true
end
yield_sleep(time)
if is_committed then box.begin() end
end
local yield_rollback = function(time)
if type(box.cfg) ~= 'function' and box.is_in_txn and box.is_in_txn() then
box.rollback()
end
return yield_sleep(time)
end
local yield_raise = function(time)
if type(box.cfg) ~= 'function' and box.is_in_txn and box.is_in_txn() then
error("Transaction left openned", 3)
end
return yield_sleep(time)
end
local mks = 1
local ms = 1000*mks
local max_ev_run_mks = 10*ms
local cpu_limit_gen_x = function(quota_mks, b_ev_mks, b_thread_mks, state_x, ...)
if state_x == nil then
return nil
end
local ev_mks = ev_time_mks()
local thread_mks
if b_ev_mks == ev_mks then
-- same loop,
-- gen_x() hasn't yielded
thread_mks = thread_time_mks()
quota_mks = quota_mks - tonumber(thread_mks - b_thread_mks)
end
return {state_x, quota_mks, ev_mks, thread_mks or thread_time_mks()}, ...
end
local cpu_limit_gen = function(param, state)
local max_quota_mks, sleep, gen_x, param_x = param[1], param[2], param[3], param[4]
local state_x, quota_mks, prev_ev_mks, prev_thread_mks = state[1], state[2], state[3], state[4]
local ev_mks = ev_time_mks()
local thread_mks
if prev_ev_mks == ev_mks then
-- the caller spent all this time on cpu
-- decrease quota
thread_mks = thread_time_mks()
quota_mks = quota_mks - tonumber(thread_mks - prev_thread_mks)
end
local wall_mks = realtime_mks()
if ((ev_mks + max_ev_run_mks) < wall_mks) or quota_mks <= 1 then
-- it's time to yield
-- Fiber need to sleep:
local sleep_time = (max_ev_run_mks - (max_quota_mks - quota_mks))/1e6
-- local sleep_time = max(max_ev_run_time, max(max_time_mks, -quota_mks))/1e6
log.verbose("yielding(quota=%s, max_allowed=%s, sleep=%s)", quota_mks, max_quota_mks, sleep_time)
sleep(sleep_time)
ev_mks = ev_time_mks()
thread_mks = thread_time_mks()
if ev_mks > wall_mks then
-- yes, any time drift does not add time quota.
quota_mks = quota_mks + tonumber(ev_mks-wall_mks)*max_quota_mks / max_ev_run_mks
end
if quota_mks > max_quota_mks then
quota_mks = max_quota_mks
end
end
return cpu_limit_gen_x(quota_mks, ev_mks, thread_mks or thread_time_mks(), gen_x(param_x, state_x))
end
local cpu_limit = function(opts, gen_x, param_x, state_x)
local cpu_limit
local sleep_with = yield_sleep
if type(opts) == 'table' then
cpu_limit = tonumber(opts.cpu_limit)
if opts.txn == 'commit' then
sleep_with = yield_commit
elseif opts.txn == 'rollback' then
sleep_with = yield_rollback
elseif opts.txn == 'raise' then
sleep_with = yield_raise
elseif opts.txn then
error("malformed cpu_limit/txn: commit, rollback or raise are supported", 2)
end
else
cpu_limit = tonumber(opts)
sleep_with = yield_sleep
end
assert(cpu_limit, "malformed cpu_limit given: should be positive number within (0;100)")
assert(cpu_limit > 0, "malformed cpu_limit given: should be positive number within (0;100)")
assert(cpu_limit < 100, "malformed cpu_limit given: should be positive number within (0;100)")
local max_quota_mks = tonumber(max_ev_run_mks * cpu_limit / 100)
return fun.wrap(cpu_limit_gen, {max_quota_mks, sleep_with, gen_x, param_x}, {state_x, max_quota_mks, ev_time_mks(), thread_time_mks()})
end
methods.cpu_limit = method1(cpu_limit)
---@diagnostic disable-next-line: inject-field
fun.cpu_limit = export1(cpu_limit)
return fun
--[[
fun.cpulimit(10, ....)
fun.ones():cpu_limit(10)
fun.ones():cpu_limit({ cpu_limit = 3, rxn='rollback' })
]]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment