Last active
December 26, 2016 20:42
-
-
Save R-omk/da0ecb6d6137ebb07a5c38200d3b6425 to your computer and use it in GitHub Desktop.
queue long polling tarantool example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- schema: key, group, weight, status, time | |
box.schema.space.create(rehab_space, { if_not_exists = true, temporary = true }) | |
box.space[rehab_space]:create_index('primary', { type = 'TREE', unique = true, parts = { 1, 'string' }, if_not_exists = true }) | |
box.space[rehab_space]:create_index('group', { type = 'TREE', unique = false, parts = { 2, 'unsigned', 4, 'unsigned', 3, 'unsigned' }, if_not_exists = true }) | |
box.space[rehab_space]:create_index('time', { type = 'TREE', unique = false, parts = { 5, 'number' }, if_not_exists = true }) | |
----------------- | |
local function rehab_release(key) | |
this.spaces.boxrehab:delete({ key }) | |
end | |
this.fibers['rehab'] = {} | |
local function _rehab_push(key, group, weight, status, time) | |
if status == nil then status = CONST_REHAB_ITEM_STATUS_READY end | |
if time == nil then | |
time = -fiber.time() - CONST_REHAB_TTL | |
end | |
this.spaces.boxrehab:replace({ key, group, weight, status, time }) | |
end | |
this.fibers['rehab']['_repush'] = fiber.create(function() | |
while true do | |
fiber.testcancel() | |
fiber.sleep(CONST_REHAB_TAKE_TIMEOUT) | |
-- change by ttr | |
for _, tuple in this.spaces.boxrehab.index.time:pairs({ fiber.time() }, { iterator = box.index.LE }) do | |
if tuple[5] <= 0 then break end | |
logwarn('rehab: repush key', tuple[1], tuple[2]) | |
local st, res = pcall(_rehab_push, tuple[1], tuple[2], tuple[3]) | |
if not st then | |
rehab_release(tuple[1]) | |
end | |
end | |
-- remove by ttl | |
for _, tuple in this.spaces.boxrehab.index.time:pairs({ -fiber.time() }, { iterator = box.index.GE }) do | |
if tuple[5] >= 0 then break end | |
this.spaces.boxrehab:delete({ tuple[1] }) | |
end | |
end | |
end) | |
local rehab_group_chan = {} | |
local rehab_group_cond = {} | |
local function _rehab_empty_return() | |
return "" | |
end | |
-- public | |
local function rehab_take(group) | |
if rehab_group_chan[group] == nil then | |
fiber.sleep(CONST_REHAB_TAKE_TIMEOUT) | |
return _rehab_empty_return() | |
else | |
-- for fast wakeup | |
-- rehab_group_cond[group]:signal() | |
end | |
local key = rehab_group_chan[group]:get(CONST_REHAB_TAKE_TIMEOUT) | |
if key == nil then | |
return _rehab_empty_return() | |
end | |
return key | |
end | |
local function _rehab_take(group) | |
local time = fiber.time() | |
local tuples = this.spaces.boxrehab.index.group:select({ group, 0 }, { limit = 1, iterator = box.index.REQ }) -- highest weight primarily | |
if tuples[1] ~= nil then | |
local tuple = tuples[1] | |
local st, res = pcall(function(tuple) | |
this.spaces.boxrehab:replace(tuple) | |
end, tuple:update({ | |
{ '=', 4, CONST_REHAB_ITEM_STATUS_TAKEN }, | |
{ '=', 5, time + CONST_REHAB_TTR } | |
})) | |
if not st then | |
rehab_release(tuple[1]) | |
return | |
end | |
return tuple | |
end | |
return | |
end | |
local function _rehab_fiber(group, cond, chan) | |
while true do | |
fiber.testcancel() | |
cond:wait(CONST_REHAB_TAKE_TIMEOUT / 4) | |
while chan:has_readers() do | |
local tuple = _rehab_take(group) | |
if tuple == nil then break end --go to wait | |
local putres = chan:put(tuple[1], CONST_REHAB_TAKE_TIMEOUT) | |
if not putres then | |
logwarn('_rehab_push repush') | |
pcall(function() | |
_rehab_push(tuple[1], tuple[2], tuple[3]) | |
end) | |
end | |
end | |
end | |
end | |
local function rehab_push(key, group, weight) | |
if weight == nil then weight = 0 end | |
local st, res | |
st, res = pcall(_rehab_push, key, group, weight) | |
if st then | |
if rehab_group_cond[group] == nil then | |
rehab_group_cond[group] = fiber.cond() | |
rehab_group_chan[group] = fiber.channel() | |
this.fibers['rehab'][group] = fiber.create(_rehab_fiber, group, rehab_group_cond[group], rehab_group_chan[group]) | |
end | |
rehab_group_cond[group]:signal() | |
else | |
logwarn('_rehab_push fail ', res) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment