Skip to content

Instantly share code, notes, and snippets.

@R-omk
Last active December 26, 2016 20:42
Show Gist options
  • Save R-omk/da0ecb6d6137ebb07a5c38200d3b6425 to your computer and use it in GitHub Desktop.
Save R-omk/da0ecb6d6137ebb07a5c38200d3b6425 to your computer and use it in GitHub Desktop.
queue long polling tarantool example
-- schema: key, group, weight, status, time
box.schema.space.create(rehab_space, { if_not_exists = true, temporary = true })
box.space[rehab_space]:create_index('primary', { type = 'TREE', unique = true, parts = { 1, 'string' }, if_not_exists = true })
box.space[rehab_space]:create_index('group', { type = 'TREE', unique = false, parts = { 2, 'unsigned', 4, 'unsigned', 3, 'unsigned' }, if_not_exists = true })
box.space[rehab_space]:create_index('time', { type = 'TREE', unique = false, parts = { 5, 'number' }, if_not_exists = true })
-----------------
local function rehab_release(key)
this.spaces.boxrehab:delete({ key })
end
this.fibers['rehab'] = {}
local function _rehab_push(key, group, weight, status, time)
if status == nil then status = CONST_REHAB_ITEM_STATUS_READY end
if time == nil then
time = -fiber.time() - CONST_REHAB_TTL
end
this.spaces.boxrehab:replace({ key, group, weight, status, time })
end
this.fibers['rehab']['_repush'] = fiber.create(function()
while true do
fiber.testcancel()
fiber.sleep(CONST_REHAB_TAKE_TIMEOUT)
-- change by ttr
for _, tuple in this.spaces.boxrehab.index.time:pairs({ fiber.time() }, { iterator = box.index.LE }) do
if tuple[5] <= 0 then break end
logwarn('rehab: repush key', tuple[1], tuple[2])
local st, res = pcall(_rehab_push, tuple[1], tuple[2], tuple[3])
if not st then
rehab_release(tuple[1])
end
end
-- remove by ttl
for _, tuple in this.spaces.boxrehab.index.time:pairs({ -fiber.time() }, { iterator = box.index.GE }) do
if tuple[5] >= 0 then break end
this.spaces.boxrehab:delete({ tuple[1] })
end
end
end)
local rehab_group_chan = {}
local rehab_group_cond = {}
local function _rehab_empty_return()
return ""
end
-- public
local function rehab_take(group)
if rehab_group_chan[group] == nil then
fiber.sleep(CONST_REHAB_TAKE_TIMEOUT)
return _rehab_empty_return()
else
-- for fast wakeup
-- rehab_group_cond[group]:signal()
end
local key = rehab_group_chan[group]:get(CONST_REHAB_TAKE_TIMEOUT)
if key == nil then
return _rehab_empty_return()
end
return key
end
local function _rehab_take(group)
local time = fiber.time()
local tuples = this.spaces.boxrehab.index.group:select({ group, 0 }, { limit = 1, iterator = box.index.REQ }) -- highest weight primarily
if tuples[1] ~= nil then
local tuple = tuples[1]
local st, res = pcall(function(tuple)
this.spaces.boxrehab:replace(tuple)
end, tuple:update({
{ '=', 4, CONST_REHAB_ITEM_STATUS_TAKEN },
{ '=', 5, time + CONST_REHAB_TTR }
}))
if not st then
rehab_release(tuple[1])
return
end
return tuple
end
return
end
local function _rehab_fiber(group, cond, chan)
while true do
fiber.testcancel()
cond:wait(CONST_REHAB_TAKE_TIMEOUT / 4)
while chan:has_readers() do
local tuple = _rehab_take(group)
if tuple == nil then break end --go to wait
local putres = chan:put(tuple[1], CONST_REHAB_TAKE_TIMEOUT)
if not putres then
logwarn('_rehab_push repush')
pcall(function()
_rehab_push(tuple[1], tuple[2], tuple[3])
end)
end
end
end
end
local function rehab_push(key, group, weight)
if weight == nil then weight = 0 end
local st, res
st, res = pcall(_rehab_push, key, group, weight)
if st then
if rehab_group_cond[group] == nil then
rehab_group_cond[group] = fiber.cond()
rehab_group_chan[group] = fiber.channel()
this.fibers['rehab'][group] = fiber.create(_rehab_fiber, group, rehab_group_cond[group], rehab_group_chan[group])
end
rehab_group_cond[group]:signal()
else
logwarn('_rehab_push fail ', res)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment