Created
August 22, 2021 06:18
-
-
Save shasharoman/0d1a5aaa72a6a052246a089c2326c9e5 to your computer and use it in GitHub Desktop.
openresty-dynamic-upstream
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
local balancer = require 'ngx.balancer' | |
local upstream = require 'ngx.upstream' | |
local dyupcl = require 'dyupcl' | |
balancer.set_timeouts(1, 1, 60) | |
local name = balancer.get_last_failure() | |
if not name then | |
local ok, err = balancer.set_more_tries(1) | |
if not ok then | |
ngx.log(ngx.ERR, err) | |
ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR) | |
return | |
end | |
end | |
local res, err = dyupcl.pick(upstream.current_upstream_name()) | |
if not res then | |
ngx.log(ngx.ERR, err) | |
ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR) | |
return | |
end | |
local ok, err = balancer.set_current_peer(res.host, res.port) | |
if not ok then | |
ngx.log(ngx.ERR, err) | |
ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR) | |
return | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- dyupcl means dynamic upstream cluster | |
local _M = {} | |
local json = require 'cjson' | |
local redis = require 'resty.redis' | |
local C = { | |
SERVERS = 'servers', | |
CNT = 'cnt', | |
NAMES = 'names' | |
} | |
-- {_names: {xxx, yyy}, _servers_xxx: [{host, port}], _servers_yyy: [{host, port}], _cnt_xxx, _cnt_yyy, `name:host:port`: ''} | |
local shm = nil | |
local function key(label, v) | |
return '_' .. label .. (v and '_' .. v or '') | |
end | |
local function init_client(host, port, password, channel) | |
local client = redis:new() | |
client:set_timeouts(1000, 1000, 3600 * 1000) | |
local ok, err = client:connect(host, port) | |
if not ok then | |
return err | |
end | |
if password and password ~= '' then | |
local res, err = client:auth(password) | |
if not res then | |
return err | |
end | |
end | |
local res, err = client:subscribe(channel) | |
if not res then | |
return err | |
end | |
return nil, client | |
end | |
local function listen(premature, opts) | |
if premature or ngx.worker.exiting() then | |
return | |
end | |
local err, client = init_client(opts.host, opts.port, opts.password, opts.channel) | |
if err then | |
ngx.log(ngx.ERR, err) | |
end | |
while true do | |
if client then | |
local res, err = client:read_reply() | |
if not res or res[1] ~= 'message' or res[2] ~= opts.channel then | |
ngx.log(ngx.ERR, 'read reply err', err) | |
client:close() | |
client = nil | |
else | |
local msg = res[3] | |
-- !name:host:port means `name:host:port` offline | |
-- ! is 33 | |
-- FIXME: offline donot work if only one server | |
if string.byte(msg) == 33 then | |
shm:delete(string.sub(msg, 2)) | |
else | |
shm:set(msg, '', opts.ttl) | |
end | |
local keys, err = shm:get_keys() | |
local name_servers = {} | |
for k, v in pairs(keys) do | |
-- exclude extra keys(start with '_') | |
if string.byte(v) ~= 95 then | |
local n, h, p = v:match('([^:]+):([^:]+):([^:]+)') | |
if n then | |
name_servers[n] = name_servers[n] or {} | |
table.insert(name_servers[n], { | |
host = h, | |
port = p | |
}) | |
end | |
end | |
end | |
for k, v in pairs(name_servers) do | |
table.sort(v, function(x, y) return x.host .. x.port < y.host .. y.port end) | |
shm:set(key(C.SERVERS, k), json.encode(v), opts.ttl) | |
end | |
end | |
else | |
ngx.log(ngx.ERR, 'reinit new redis client') | |
ngx.sleep(1) | |
err, client = init_client(opts.host, opts.port, opts.password, opts.channel) | |
if err then | |
ngx.log(ngx.ERR, err) | |
end | |
end | |
end | |
end | |
function _M.pick(name) | |
local servers = shm:get(key(C.SERVERS, name)) | |
if not servers then | |
return nil, 'no available server' | |
end | |
servers = json.decode(servers) | |
local len = table.getn(servers) | |
local cnt = shm:incr(key(C.CNT, name), 1, 0) | |
cnt = cnt or 0 | |
return servers[cnt % len + 1] | |
end | |
function _M.status() | |
local names = shm:get(key(C.NAMES)) | |
if not names then | |
return '' | |
end | |
names = json.decode(names) | |
local res = '' | |
for k, v in pairs(names) do | |
local cnt = shm:get(key(C.CNT, v)) or 0 | |
res = res .. v .. ':' .. cnt .. '\n' | |
local servers = shm:get(key(C.SERVERS, v)) | |
if servers then | |
servers = json.decode(servers) | |
for k, v in pairs(servers) do | |
res = res .. v.host .. ':' .. v.port .. '\n' | |
end | |
end | |
res = res .. '\n' | |
end | |
return res | |
end | |
function _M.init(opts) | |
shm = ngx.shared[opts.shm or 'dyupcl'] | |
if ngx.worker.id() ~= 0 then | |
return | |
end | |
shm:set(key(C.NAMES), json.encode(opts.names or {})) | |
ngx.timer.at(0, listen, { | |
host = opts.host or '127.0.0.1', | |
port = opts.port or 6379, | |
password = opts.password or '', | |
channel = opts.channel or 'fw:msvc:pulse', | |
ttl = opts.ttl or 30 | |
}) | |
end | |
return _M |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
local dyupcl = require 'dyupcl' | |
dyupcl.init({ | |
shm = 'dyupcl', | |
names = {'dev'}, | |
host = '127.0.0.1', | |
password = '', | |
port = 6379, | |
channel = 'fw:msvc:pulse', | |
ttl = 30 | |
}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
http { | |
lua_package_path "/usr/local/openresty/lualib/?.lua;/usr/local/openresty/nginx/conf/lua/?.lua;;"; | |
# for dyupcl | |
lua_shared_dict dyupcl 2m; | |
init_worker_by_lua_file conf/lua/init_worker.lua; | |
upstream dev { | |
server 0.0.0.1; | |
balancer_by_lua_file conf/lua/balancer.lua; | |
} | |
server { | |
listen 80; | |
server_name dev.com; | |
location / { | |
proxy_pass http://dev; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment