Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
openresty-dynamic-upstream
local balancer = require 'ngx.balancer'
local upstream = require 'ngx.upstream'
local dyupcl = require 'dyupcl'
balancer.set_timeouts(1, 1, 60)
local name = balancer.get_last_failure()
if not name then
local ok, err = balancer.set_more_tries(1)
if not ok then
ngx.log(ngx.ERR, err)
ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR)
return
end
end
local res, err = dyupcl.pick(upstream.current_upstream_name())
if not res then
ngx.log(ngx.ERR, err)
ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR)
return
end
local ok, err = balancer.set_current_peer(res.host, res.port)
if not ok then
ngx.log(ngx.ERR, err)
ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR)
return
end
-- dyupcl means dynamic upstream cluster
local _M = {}
local json = require 'cjson'
local redis = require 'resty.redis'
local C = {
SERVERS = 'servers',
CNT = 'cnt',
NAMES = 'names'
}
-- {_names: {xxx, yyy}, _servers_xxx: [{host, port}], _servers_yyy: [{host, port}], _cnt_xxx, _cnt_yyy, `name:host:port`: ''}
local shm = nil
local function key(label, v)
return '_' .. label .. (v and '_' .. v or '')
end
local function init_client(host, port, password, channel)
local client = redis:new()
client:set_timeouts(1000, 1000, 3600 * 1000)
local ok, err = client:connect(host, port)
if not ok then
return err
end
if password and password ~= '' then
local res, err = client:auth(password)
if not res then
return err
end
end
local res, err = client:subscribe(channel)
if not res then
return err
end
return nil, client
end
local function listen(premature, opts)
if premature or ngx.worker.exiting() then
return
end
local err, client = init_client(opts.host, opts.port, opts.password, opts.channel)
if err then
ngx.log(ngx.ERR, err)
end
while true do
if client then
local res, err = client:read_reply()
if not res or res[1] ~= 'message' or res[2] ~= opts.channel then
ngx.log(ngx.ERR, 'read reply err', err)
client:close()
client = nil
else
local msg = res[3]
-- !name:host:port means `name:host:port` offline
-- ! is 33
-- FIXME: offline donot work if only one server
if string.byte(msg) == 33 then
shm:delete(string.sub(msg, 2))
else
shm:set(msg, '', opts.ttl)
end
local keys, err = shm:get_keys()
local name_servers = {}
for k, v in pairs(keys) do
-- exclude extra keys(start with '_')
if string.byte(v) ~= 95 then
local n, h, p = v:match('([^:]+):([^:]+):([^:]+)')
if n then
name_servers[n] = name_servers[n] or {}
table.insert(name_servers[n], {
host = h,
port = p
})
end
end
end
for k, v in pairs(name_servers) do
table.sort(v, function(x, y) return x.host .. x.port < y.host .. y.port end)
shm:set(key(C.SERVERS, k), json.encode(v), opts.ttl)
end
end
else
ngx.log(ngx.ERR, 'reinit new redis client')
ngx.sleep(1)
err, client = init_client(opts.host, opts.port, opts.password, opts.channel)
if err then
ngx.log(ngx.ERR, err)
end
end
end
end
function _M.pick(name)
local servers = shm:get(key(C.SERVERS, name))
if not servers then
return nil, 'no available server'
end
servers = json.decode(servers)
local len = table.getn(servers)
local cnt = shm:incr(key(C.CNT, name), 1, 0)
cnt = cnt or 0
return servers[cnt % len + 1]
end
function _M.status()
local names = shm:get(key(C.NAMES))
if not names then
return ''
end
names = json.decode(names)
local res = ''
for k, v in pairs(names) do
local cnt = shm:get(key(C.CNT, v)) or 0
res = res .. v .. ':' .. cnt .. '\n'
local servers = shm:get(key(C.SERVERS, v))
if servers then
servers = json.decode(servers)
for k, v in pairs(servers) do
res = res .. v.host .. ':' .. v.port .. '\n'
end
end
res = res .. '\n'
end
return res
end
function _M.init(opts)
shm = ngx.shared[opts.shm or 'dyupcl']
if ngx.worker.id() ~= 0 then
return
end
shm:set(key(C.NAMES), json.encode(opts.names or {}))
ngx.timer.at(0, listen, {
host = opts.host or '127.0.0.1',
port = opts.port or 6379,
password = opts.password or '',
channel = opts.channel or 'fw:msvc:pulse',
ttl = opts.ttl or 30
})
end
return _M
local dyupcl = require 'dyupcl'
dyupcl.init({
shm = 'dyupcl',
names = {'dev'},
host = '127.0.0.1',
password = '',
port = 6379,
channel = 'fw:msvc:pulse',
ttl = 30
})
http {
lua_package_path "/usr/local/openresty/lualib/?.lua;/usr/local/openresty/nginx/conf/lua/?.lua;;";
# for dyupcl
lua_shared_dict dyupcl 2m;
init_worker_by_lua_file conf/lua/init_worker.lua;
upstream dev {
server 0.0.0.1;
balancer_by_lua_file conf/lua/balancer.lua;
}
server {
listen 80;
server_name dev.com;
location / {
proxy_pass http://dev;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment