|
-- away_old_backup |
|
-- Copyright (C) 2020 thislight |
|
-- This program is free software: you can redistribute it and/or modify |
|
-- it under the terms of the GNU General Public License as published by |
|
-- the Free Software Foundation, either version 3 of the License, or |
|
-- any later version. |
|
-- This program is distributed in the hope that it will be useful, |
|
-- but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
-- GNU General Public License for more details. |
|
-- You should have received a copy of the GNU General Public License |
|
-- along with this program. If not, see <https://www.gnu.org/licenses/>. |
|
|
|
local co = coroutine |
|
|
|
local scheduler = { |
|
threads = {}, |
|
services = {}, |
|
signals = {}, |
|
current_thread_index = 0, |
|
_stopflag = false |
|
} |
|
|
|
function scheduler:clone_to(to) |
|
to = to or {} |
|
for k, v in pairs(self) do to[k] = self[v] end |
|
return to |
|
end |
|
|
|
function scheduler:next_thread() |
|
self.current_thread_index = self.current_thread_index + 1 |
|
if #self.threads > self.current_thread_index then |
|
self.current_thread_index = 0 |
|
return nil |
|
else |
|
return self.threads[self.current_thread_index] |
|
end |
|
end |
|
|
|
function scheduler:current_thread() |
|
return self.threads[self.current_thread_index] |
|
end |
|
|
|
function scheduler:jumpin_thread(thread, signal) |
|
if co.status(thread) ~= 'dead' then |
|
local stat, new_signal = co.resume(thread, signal) |
|
if stat and new_signal then |
|
self:push_signal(new_signal) |
|
end |
|
end |
|
end |
|
|
|
function scheduler:add_service(name, service) |
|
assert(self:has_service(name), |
|
"scheduler could not have two or more services with same name") |
|
local service_thread, stop = service:init() |
|
self.services[name] = { |
|
thread = service_thread, |
|
stop = stop or function() end, |
|
signals = {} |
|
} |
|
end |
|
|
|
function scheduler:has_service(name) return self.services[name] ~= nil end |
|
|
|
function scheduler:_loopstep_service() |
|
for k, serviced in pairs(self.services) do |
|
co.resume(serviced.thread, scheduler, serviced) |
|
if co.status(serviced) == "dead" then |
|
serviced.stop() |
|
self.services[k] = nil |
|
end |
|
end |
|
end |
|
|
|
function scheduler:_loopstep_signals() |
|
while #self.signals > 0 do |
|
local sigd = self:pop_signald() |
|
if sigd.target_type == 'service' then |
|
local serviced = self.services[sigd.service] |
|
if serviced then |
|
table.insert(serviced.signals, sigd) |
|
else |
|
error("service " .. tostring(sigd.service) .. " not found") |
|
end |
|
elseif sigd.target_type == 'thread' then |
|
if sigd.target_thread then |
|
self:jumpin_thread(sigd.target_thread, sigd.signal) |
|
end |
|
elseif sigd.target_type == 'mtask' then |
|
xpcall(sigd.mtask, function(stat, msg) |
|
if not stat then |
|
if debug then |
|
local debuginfo = debug.getinfo(sigd.mtask, "nS") |
|
error(string.format( |
|
"error happen while scheduler running microtask %s (defined in %s): %s", |
|
debuginfo.name, debuginfo.linedefined, msg)) |
|
else |
|
error( |
|
"error happen while scheduler running mircotask: " .. |
|
msg) |
|
end |
|
end |
|
end) |
|
end |
|
end |
|
end |
|
|
|
function scheduler:_loopstep_remove_dead_thread() |
|
local new_thread_list = {} |
|
for i, thread in ipairs(self.threads) do |
|
if co.status(thread) ~= "dead" then |
|
table.insert(new_thread_list, thread) |
|
end |
|
end |
|
self.threads = new_thread_list |
|
end |
|
|
|
function scheduler:_loopstep() |
|
self:_loopstep_signals() |
|
self:_loopstep_service() |
|
self:_loopstep_remove_dead_thread() |
|
end |
|
|
|
function scheduler:push_signal(signal) |
|
local sigd = {signal = signal, target_type = signal.target_type or 'thread'} |
|
if signal.target_type == 'thread' then |
|
sigd.target_thread = signal.target_thread or self.threads[self.target] |
|
elseif signal.target_type == 'service' then |
|
sigd.service = signal.service |
|
elseif signal.target_type == 'mtask' then |
|
sigd.mtask = signal.mtask |
|
end |
|
table.insert(self.signals, sigd) |
|
end |
|
|
|
function scheduler:pop_signald() return table.remove(self.signals, 1) end |
|
|
|
function scheduler:kill_service(name) |
|
self:push_signal{ |
|
target_type = 'service', |
|
service = name, |
|
from_scheduler = true, |
|
should = 'die', |
|
back_signal = { |
|
target_thread = co.wrap(function() |
|
self.services[name].stop() |
|
end) |
|
} |
|
} |
|
end |
|
|
|
function scheduler:runforever() |
|
while not self._stopflag do self:_loopstep() end |
|
self:_stop() |
|
end |
|
|
|
function scheduler:stop() self._stopflag = true end |
|
|
|
function scheduler:_stop() |
|
for name, _ in pairs(self.services) do scheduler:kill_service(name) end |
|
self.threads = {} |
|
self.services = {} |
|
self.signals = {} |
|
self.current_thread_index = 0 |
|
self._stopflag = false |
|
end |
|
|
|
function scheduler:mircotask(f) |
|
self:push_signal{target_type = 'mtask', mtask = f} |
|
end |
|
|
|
function scheduler:add_thread(thread) |
|
table.insert(self.threads, thread) |
|
return #self.threads |
|
end |
|
|
|
local away_internal_serv = {} |
|
|
|
function away_internal_serv:init() |
|
local thread = co.create(self.body) |
|
co.resume(thread, self) |
|
return thread |
|
end |
|
|
|
function away_internal_serv.body(self) |
|
while true do |
|
local scheduler, serviced = co.yield() |
|
for i, sigd in ipairs(serviced.signals) do |
|
if sigd.command and sigd.command ~= 'body' and sigd.command ~= |
|
'init' then |
|
local handler = self[sigd.command] |
|
if handler then handler(self, scheduler, sigd) end |
|
end |
|
end |
|
serviced.signals = {} |
|
end |
|
end |
|
|
|
function away_internal_serv:wake_thread(scheduler, sigd) |
|
local thread = sigd.signal.target_co |
|
scheduler.push_signal { |
|
target_type = 'mtask', |
|
mtask = function() scheduler:jumpin_thread(thread, sigd.singal) end |
|
} |
|
end |
|
|
|
function scheduler:init() scheduler:add_service('away', away_internal_serv) end |
|
|
|
function scheduler:wake_thread_by_index(i, extra) |
|
self:wake_thread(self.threads[i], extra) |
|
end |
|
|
|
function scheduler:wake_thread(th, extra) |
|
local request_sig = { |
|
target_type = 'service', |
|
service = 'away', |
|
command = 'wake_thread', |
|
target_thread = th |
|
} |
|
if extra then for k, v in pairs(extra) do request_sig[k] = v end end |
|
self:push_signal(request_sig) |
|
end |
|
|
|
local promise_service = {} |
|
|
|
function promise_service:init() |
|
local thread = co.create(promise_service.body) |
|
co.resume(self) |
|
return thread |
|
end |
|
|
|
function promise_service:body() |
|
while true do |
|
local scheduler, serviced = co.yield() |
|
while #serviced.signals > 0 do |
|
local signald = table.remove(serviced.signals, 1) |
|
local signal = signald.signal |
|
if signal.command == 'schedule_callbacks' then |
|
local promiz = signal.promise |
|
for _, v in ipairs(promiz.callbacks) do |
|
scheduler:push_signal{ |
|
target_type = 'mtask', |
|
mtask = function() |
|
v(promiz.value) |
|
end |
|
} |
|
end |
|
for _, th in ipairs(promiz.waiting_threads) do |
|
scheduler:wake_thread(th, {promise = promiz}) |
|
end |
|
elseif signal.command == 'schedule_error_callbacks' then |
|
local promiz = signal.promise |
|
for _, v in ipairs(promiz.callbacks) do |
|
scheduler:push_signal{ |
|
target_type = 'mtask', |
|
mtask = function() v(promiz.err) end |
|
} |
|
end |
|
for _, th in ipairs(promiz.waiting_threads) do |
|
scheduler:wake_thread(th, {promise = promiz}) |
|
end |
|
end |
|
end |
|
end |
|
end |
|
|
|
local promise = {} |
|
|
|
function promise:clone_to(new_t) |
|
for k, v in pairs(self) do new_t[k] = v end |
|
return new_t |
|
end |
|
|
|
function promise:create(schd) |
|
return self:clone_to{ |
|
scheduler = schd or scheduler, |
|
callbacks = {}, |
|
error_callbacks = {}, |
|
waiting_threads = {} |
|
} |
|
end |
|
|
|
function promise:done(value) |
|
self.value = value |
|
self.scheduler:push_signal{ |
|
target_type = 'service', |
|
service = 'promise', |
|
command = 'schedule_callbacks', |
|
promise = self |
|
} |
|
return self |
|
end |
|
|
|
function promise:error(e) |
|
self.err = e |
|
self.scheduler:push_signal{ |
|
target_type = 'service', |
|
service = 'promise', |
|
command = 'schedule_error_callbacks', |
|
promise = self |
|
} |
|
return self |
|
end |
|
|
|
function promise:onfi(callback) |
|
local new_promise = promise:create(self.scheduler) |
|
table.insert(self.callbacks, new_promise:bind(callback)) |
|
return new_promise |
|
end |
|
|
|
function promise:bind(callback) |
|
return function(value) |
|
local stat, result = pcall(callback, value) |
|
if stat then |
|
self:done(result) |
|
else |
|
self:error(result) |
|
end |
|
end |
|
end |
|
|
|
function promise:onerr(error_callback) |
|
local new_promise = promise:create(self.scheduler) |
|
table.insert(self.error_callbacks, new_promise:bind(error_callback)) |
|
return new_promise |
|
end |
|
|
|
function promise:isfulfilled() return (self.value or self.err) end |
|
|
|
function promise:isdone() return self:isfulfilled() and self.value end |
|
|
|
function promise:iserr() return self:isfulfilled() and self.err end |
|
|
|
function promise:wait() -- This function only works correctly in threads, not in services |
|
local stat, result = self:pwait() |
|
if stat then |
|
return result |
|
else |
|
error(result) |
|
end |
|
end |
|
|
|
function promise:pwait() |
|
table.insert(self.waiting_threads, self.scheduler:current_thread()) |
|
local waiting = true |
|
while waiting do |
|
local signal = co.yield() |
|
if signal.promise == self then waiting = false end |
|
end |
|
return self:isdone(), (self.value or self.err) |
|
end |
|
|
|
return {scheduler = scheduler, promise = promise} |