Skip to content

Instantly share code, notes, and snippets.

@Mons
Created July 29, 2015 23:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save Mons/42fcf9cec738ba746224 to your computer and use it in GitHub Desktop.
Save Mons/42fcf9cec738ba746224 to your computer and use it in GitHub Desktop.
Connection pool draft
local obj = require('obj')
local fiber = require('fiber')
local log = require('log')
local remote = require('net.box')
local pool = obj.class({},'pool')
function pool:_init(cfg)
local zones = {}
self.total = 0
self.timeout = self.timeout or 1
for _,srv in ipairs(cfg.servers) do
local login = srv.login or cfg.login
local password = srv.password or cfg.password
local uri = login .. ':' .. password .. '@' .. srv.uri
local zid = srv.zone or cfg.zone or 'default'
local node = {
peer = srv.uri,
uri = uri,
zone = zid,
state = 'inactive',
}
if not zones[zid] then
zones[zid] = {
id = zid,
total = 0,
active = {}, -- active and responsive connected nodes
inactive = {}, -- disconnected nodes
deferred = {}, -- tcp connected but unresponsive nodes
}
end
local zone = zones[zid]
zone.total = zone.total + 1
self.total = self.total + 1
table.insert(zone.inactive,node)
end
self.zones = zones
self.name = cfg.name or 'default'
end
function pool:counts()
if self._counts then return self._counts end
local active = 0
local inactive = 0
local deferred = 0
for _,z in pairs(self.zones) do
active = active + #z.active
inactive = inactive + #z.inactive
deferred = deferred + #z.deferred
end
self._counts = {
active = active;
inactive = inactive;
deferred = deferred;
}
return self._counts
end
function pool:_move_node(zone,node,state1,state2)
local found = false
log.info("move node %s from %s to %s", node.peer, state1, state2)
for _,v in pairs(zone[state1]) do
if v == node then
table.remove(zone[state1],_)
found = true
break
end
end
if not found then
log.error("Node %s not dound in state %s.",node.peer,state1)
end
table.insert(zone[state2],node)
node.state = state2
end
function pool:node_state_change(zone,node,state)
local prevstate = node.state
self._counts = nil
if state == 'active' then
self:_move_node(zone,node,prevstate,state)
self:on_connected_one(node)
if #zone.active == zone.total then
self:on_connected_zone(zone)
end
if self:counts().active == self.total then
self:on_connected()
end
else -- deferred or inactive
self:_move_node(zone,node,prevstate,state)
-- moving from deferred to inactive we don't alert with callbacks
if prevstate == 'active' then
self:on_disconnect_one(node)
if #zone.active == 0 then
self:on_disconnect_zone(zone)
end
if self:counts().active == 0 then
self:on_disconnect()
end
end
end
end
function pool:connect()
if self.__connecting then return end
self.__connecting = true
self:on_init()
for zid,zone in pairs(self.zones) do
for _,node in pairs(zone.inactive) do
fiber.create(function()
fiber.name(self.name..':'..node.peer)
node.conn = remote:new( node.uri, { reconnect_after = 1/3, timeout = 1 } )
local state
local conn_generation = 0
while true do
state = node.conn:_wait_state({active = true})
local r,e = pcall(node.conn.eval,node.conn,"return box.info.server.uuid")
if r and e then
local uuid = e
if node.uuid and uuid ~= node.uuid then
log.warn("server %s changed it's uuid %s -> %s",node.peer,node.uuid,uuid)
end
node.uuid = uuid
conn_generation = conn_generation + 1
--- TODO: if self then ...
log.info("connected %s, uuid = %s",node.peer,uuid)
self:node_state_change(zone,node,'active')
--- start pinger
fiber.create(function()
local gen = conn_generation
local last_state_ok = true
while gen == conn_generation do
--- TODO: replace ping with node status (rw/ro)
local r,e = pcall(node.conn.ping,node.conn:timeout(self.timeout))
if r and e then
if not last_state_ok and gen == conn_generation then
log.info("node %s become online by ping",node.peer)
last_state_ok = true
self:node_state_change(zone,node,'active')
end
else
if last_state_ok and gen == conn_generation then
log.warn("node %s become offline by ping: %s",node.peer,e)
last_state_ok = false
self:node_state_change(zone,node,'deferred')
end
end
fiber.sleep(1)
end
end)
state = node.conn:_wait_state({error = true, closed = true})
self:node_state_change(zone,node,'inactive')
else
log.warn("uuid request failed for %s: %s",node.peer,e)
end
end
end)
end
end
--
end
function pool:on_connected_one (node)
log.info('on_connected_one %s : %s',node.peer,node.uuid)
end
function pool:on_connected_zone (zone)
log.info('on_connected_zone %s',zone.id)
end
function pool:on_connected ()
log.info('on_connected all')
end
function pool:on_connfail( node )
log.info('on_connfail ???')
end
function pool:on_disconnect_one (node)
log.info('on_disconnect_one %s : %s',node.peer,node.uuid)
end
function pool:on_disconnect_zone (zone)
log.info('on_disconnect_zone %s',zone.id)
end
function pool:on_disconnect ()
log.info('on_disconnect all')
end
function pool:on_init ()
end
function pool.heartbeat()
local self = _G['pool']
-- ...
end
_G['pool'] = pool
return pool
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment