Skip to content

Instantly share code, notes, and snippets.

@DavadDi
Created September 13, 2019 14:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save DavadDi/748398f2de54635011eff689dda641de to your computer and use it in GitHub Desktop.
Save DavadDi/748398f2de54635011eff689dda641de to your computer and use it in GitHub Desktop.
-- k8s.io/ingress-nginx/rootfs/etc/nginx/lua/
local ngx_balancer = require("ngx.balancer")
local cjson = require("cjson.safe")
local util = require("util")
local dns_util = require("util.dns")
local configuration = require("configuration")
local round_robin = require("balancer.round_robin")
local chash = require("balancer.chash")
local chashsubset = require("balancer.chashsubset")
local sticky = require("balancer.sticky")
local ewma = require("balancer.ewma")
-- measured in seconds
-- for an Nginx worker to pick up the new list of upstream peers
-- it will take <the delay until controller POSTed the backend object to the Nginx endpoint> + BACKENDS_SYNC_INTERVAL
local BACKENDS_SYNC_INTERVAL = 1
local DEFAULT_LB_ALG = "round_robin"
local IMPLEMENTATIONS = {
round_robin = round_robin,
chash = chash,
chashsubset = chashsubset,
sticky = sticky,
ewma = ewma,
}
local _M = {}
-- 分别保存了 backend -> banlancer 实现集合
local balancers = {}
-- 根据 backend 信息,获取到当前的负载均衡器的实现
local function get_implementation(backend)
local name = backend["load-balance"] or DEFAULT_LB_ALG
if backend["sessionAffinityConfig"] and backend["sessionAffinityConfig"]["name"] == "cookie" then
name = "sticky"
elseif backend["upstreamHashByConfig"] and backend["upstreamHashByConfig"]["upstream-hash-by"] then
if backend["upstreamHashByConfig"]["upstream-hash-by-subset"] then
name = "chashsubset"
else
name = "chash"
end
end
local implementation = IMPLEMENTATIONS[name]
if not implementation then
ngx.log(ngx.WARN, string.format("%s is not supported, falling back to %s", backend["load-balance"], DEFAULT_LB_ALG))
implementation = IMPLEMENTATIONS[DEFAULT_LB_ALG]
end
return implementation
end
local function resolve_external_names(original_backend)
local backend = util.deepcopy(original_backend)
local endpoints = {}
for _, endpoint in ipairs(backend.endpoints) do
local ips = dns_util.resolve(endpoint.address)
for _, ip in ipairs(ips) do
table.insert(endpoints, { address = ip, port = endpoint.port })
end
end
backend.endpoints = endpoints
return backend
end
local function format_ipv6_endpoints(endpoints)
local formatted_endpoints = {}
for _, endpoint in ipairs(endpoints) do
local formatted_endpoint = endpoint
if not endpoint.address:match("^%d+.%d+.%d+.%d+$") then
formatted_endpoint.address = string.format("[%s]", endpoint.address)
end
table.insert(formatted_endpoints, formatted_endpoint)
end
return formatted_endpoints
end
-- 同步 backend 的实现
local function sync_backend(backend)
-- backend 服务后端没有 endpoints
if not backend.endpoints or #backend.endpoints == 0 then
ngx.log(ngx.INFO, string.format("there is no endpoint for backend %s. Removing...", backend.name))
balancers[backend.name] = nil
return
end
local implementation = get_implementation(backend)
local balancer = balancers[backend.name]
if not balancer then
balancers[backend.name] = implementation:new(backend)
return
end
-- every implementation is the metatable of its instances (see .new(...) functions)
-- here we check if `balancer` is the instance of `implementation`
-- if it is not then we deduce LB algorithm has changed for the backend
if getmetatable(balancer) ~= implementation then
ngx.log(ngx.INFO,
string.format("LB algorithm changed from %s to %s, resetting the instance", balancer.name, implementation.name))
balancers[backend.name] = implementation:new(backend)
return
end
-- 如果是外部域名,则需要进行解析
local service_type = backend.service and backend.service.spec and backend.service.spec["type"]
if service_type == "ExternalName" then
backend = resolve_external_names(backend)
end
backend.endpoints = format_ipv6_endpoints(backend.endpoints)
-- 为对应的 balancer 更新 backend
balancer:sync(backend)
end
local function sync_backends()
local backends_data = configuration.get_backends_data()
if not backends_data then
balancers = {}
return
end
local new_backends, err = cjson.decode(backends_data)
if not new_backends then
ngx.log(ngx.ERR, "could not parse backends data: ", err)
return
end
-- balancers_to_keep 记录同步过来的数据,并把本次更新的 name 对应的 banlancer 保存起来
local balancers_to_keep = {}
for _, new_backend in ipairs(new_backends) do
sync_backend(new_backend)
balancers_to_keep[new_backend.name] = balancers[new_backend.name]
end
-- 删除某些本次更新不存在的 backend
for backend_name, _ in pairs(balancers) do
if not balancers_to_keep[backend_name] then
balancers[backend_name] = nil
end
end
end
local function route_to_alternative_balancer(balancer)
if not balancer.alternative_backends then
return false
end
-- TODO: support traffic shaping for n > 1 alternative backends
local backend_name = balancer.alternative_backends[1]
if not backend_name then
ngx.log(ngx.ERR, "empty alternative backend")
return false
end
local alternative_balancer = balancers[backend_name]
if not alternative_balancer then
ngx.log(ngx.ERR, "no alternative balancer for backend: " .. tostring(backend_name))
return false
end
local traffic_shaping_policy = alternative_balancer.traffic_shaping_policy
if not traffic_shaping_policy then
ngx.log(ngx.ERR, "traffic shaping policy is not set for balanacer of backend: " .. tostring(backend_name))
return false
end
local target_header = util.replace_special_char(traffic_shaping_policy.header, "-", "_")
local header = ngx.var["http_" .. target_header]
if header then
if traffic_shaping_policy.headerValue and #traffic_shaping_policy.headerValue > 0 then
if traffic_shaping_policy.headerValue == header then
return true
end
elseif header == "always" then
return true
elseif header == "never" then
return false
end
end
local target_cookie = traffic_shaping_policy.cookie
local cookie = ngx.var["cookie_" .. target_cookie]
if cookie then
if cookie == "always" then
return true
elseif cookie == "never" then
return false
end
end
if math.random(100) <= traffic_shaping_policy.weight then
return true
end
return false
end
local function get_balancer()
if ngx.ctx.balancer then
return ngx.ctx.balancer
end
local backend_name = ngx.var.proxy_upstream_name
local balancer = balancers[backend_name]
if not balancer then
return
end
-- route_to_alternative_balancer 的作用是?是否切换到另外一个后端服务的判断?
if route_to_alternative_balancer(balancer) then
local alternative_backend_name = balancer.alternative_backends[1]
ngx.var.proxy_alternative_upstream_name = alternative_backend_name
balancer = balancers[alternative_backend_name]
end
ngx.ctx.balancer = balancer
return balancer
end
-- 设置定时器同步任务
function _M.init_worker()
sync_backends() -- when worker starts, sync backends without delay
local _, err = ngx.timer.every(BACKENDS_SYNC_INTERVAL, sync_backends)
if err then
ngx.log(ngx.ERR, string.format("error when setting up timer.every for sync_backends: %s", tostring(err)))
end
end
function _M.rewrite()
local balancer = get_balancer()
if not balancer then
ngx.status = ngx.HTTP_SERVICE_UNAVAILABLE
return ngx.exit(ngx.status)
end
end
function _M.balance()
local balancer = get_balancer()
if not balancer then
return
end
local peer = balancer:balance()
if not peer then
ngx.log(ngx.WARN, "no peer was returned, balancer: " .. balancer.name)
return
end
ngx_balancer.set_more_tries(1)
local ok, err = ngx_balancer.set_current_peer(peer)
if not ok then
ngx.log(ngx.ERR, string.format("error while setting current upstream peer %s: %s", peer, err))
end
end
function _M.log()
local balancer = get_balancer()
if not balancer then
return
end
if not balancer.after_balance then
return
end
balancer:after_balance()
end
if _TEST then
_M.get_implementation = get_implementation
_M.sync_backend = sync_backend
_M.route_to_alternative_balancer = route_to_alternative_balancer
_M.get_balancer = get_balancer
end
return _M
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment