Created
September 24, 2018 23:48
-
-
Save rainest/0a59608cf6c86d84b113c26b465b5419 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--- Cassandra cluster client module. | |
-- Cluster module for OpenResty. | |
-- @module resty.cassandra.cluster | |
-- @author thibaultcha | |
-- @release 1.3.2 | |
-- Modified from https://github.com/thibaultcha/lua-cassandra/blob/master/lib/resty/cassandra/cluster.lua | |
-- Adds some extra debug logging used during development | |
local resty_lock = require 'resty.lock' | |
local cassandra = require 'cassandra' | |
local cql = require 'cassandra.cql' | |
local update_time = ngx.update_time | |
local cql_errors = cql.errors | |
local requests = cql.requests | |
local tonumber = tonumber | |
local concat = table.concat | |
local shared = ngx.shared | |
local assert = assert | |
local pairs = pairs | |
local fmt = string.format | |
local sub = string.sub | |
local find = string.find | |
local now = ngx.now | |
local type = type | |
local log = ngx.log | |
local ERR = ngx.ERR | |
local WARN = ngx.WARN | |
local DEBUG = ngx.DEBUG | |
local NOTICE = ngx.NOTICE | |
local empty_t = {} | |
local _log_prefix = '[lua-cassandra] ' | |
local _rec_key = 'host:rec:' | |
local _prepared_key = 'prepared:id:' | |
local _init_key = 'init:status' | |
local _protocol_version_key = 'protocol:version:' | |
local _bind_all_address = '0.0.0.0' | |
local function get_now() | |
return now() * 1000 | |
end | |
----------------------------------------- | |
-- Hosts status+info stored in shm | |
----------------------------------------- | |
local function set_peer(self, host, up, reconn_delay, unhealthy_at, | |
data_center, release_version) | |
data_center = data_center or '' | |
release_version = release_version or '' | |
-- host status | |
local ok, err = self.shm:safe_set(host, up) | |
if not ok then | |
return nil, 'could not set host details in shm: '..err | |
end | |
-- host info | |
local marshalled = fmt("%d:%d:%d:%s%s", reconn_delay, unhealthy_at, | |
#data_center, data_center, release_version) | |
ok, err = self.shm:safe_set(_rec_key..host, marshalled) | |
if not ok then | |
return nil, 'could not set host details in shm: '..err | |
end | |
return true | |
end | |
local function add_peer(self, host, data_center) | |
return set_peer(self, host, true, 0, 0, data_center, "") | |
end | |
local function get_peer(self, host, status) | |
local marshalled, err = self.shm:get(_rec_key .. host) | |
if err then | |
return nil, 'could not get host details in shm: '..err | |
elseif marshalled == nil then | |
return nil, 'no host details for '..host | |
elseif type(marshalled) ~= 'string' then | |
return nil, 'corrupted shm' | |
end | |
if status == nil then | |
status, err = self.shm:get(host) | |
if err then return nil, 'could not get host status in shm: '..err end | |
end | |
local sep_1 = find(marshalled, ":", 1, true) | |
local sep_2 = find(marshalled, ":", sep_1 + 1, true) | |
local sep_3 = find(marshalled, ":", sep_2 + 1, true) | |
local reconn_delay = sub(marshalled, 1, sep_1 - 1) | |
local unhealthy_at = sub(marshalled, sep_1 + 1, sep_2 - 1) | |
local data_center_len = sub(marshalled, sep_2 + 1, sep_3 - 1) | |
local data_center_last = sep_3 + tonumber(data_center_len) | |
local data_center = sub(marshalled, sep_3 + 1, data_center_last) | |
local release_version = sub(marshalled, data_center_last + 1) | |
return { | |
up = status, | |
host = host, | |
data_center = data_center ~= '' and data_center or nil, | |
release_version = release_version ~= '' and release_version or nil, | |
reconn_delay = tonumber(reconn_delay), | |
unhealthy_at = tonumber(unhealthy_at) | |
} | |
end | |
local function get_peers(self) | |
local peers = {} | |
local keys = self.shm:get_keys() -- 1024 keys | |
-- we shall have a relatively small number of keys, but in any case this | |
-- function is not to be called in hot paths anyways. | |
for i = 1, #keys do | |
if sub(keys[i], 1, #_rec_key) == _rec_key then | |
local host = sub(keys[i], #_rec_key + 1) | |
local peer, err = get_peer(self, host) | |
if not peer then return nil, err end | |
peers[#peers+1] = peer | |
end | |
end | |
if #peers > 0 then | |
return peers | |
end | |
end | |
local function delete_peer(self, host) | |
self.shm:delete(_rec_key .. host) -- details | |
self.shm:delete(host) -- status bool | |
end | |
local function set_peer_down(self, host) | |
if self.logging then | |
log(WARN, _log_prefix, 'setting host at ', host, ' DOWN') | |
end | |
local peer = get_peer(self, host, false) | |
peer = peer or empty_t -- this can be called from refresh() so no host in shm yet | |
return set_peer(self, host, false, self.reconn_policy:next_delay(host), get_now(), | |
peer.data_center, peer.release_version) | |
end | |
local function set_peer_up(self, host) | |
if self.logging then | |
log(NOTICE, _log_prefix, 'setting host at ', host, ' UP') | |
end | |
self.reconn_policy:reset(host) | |
local peer = get_peer(self, host, true) | |
peer = peer or empty_t -- this can be called from refresh() so no host in shm yet | |
return set_peer(self, host, true, 0, 0, | |
peer.data_center, peer.release_version) | |
end | |
local function can_try_peer(self, host) | |
local up, err = self.shm:get(host) | |
if up then return up | |
elseif err then return nil, err | |
else | |
-- reconnection policy steps in before making a decision | |
local peer_rec, err = get_peer(self, host, up) | |
if not peer_rec then return nil, err end | |
return get_now() - peer_rec.unhealthy_at >= peer_rec.reconn_delay, nil, true | |
end | |
end | |
---------------------------- | |
-- utils | |
---------------------------- | |
local function spawn_peer(host, port, keyspace, opts) | |
opts = opts or {} | |
opts.host = host | |
opts.port = port | |
opts.keyspace = keyspace | |
return cassandra.new(opts) | |
end | |
local function check_peer_health(self, host, coordinator_options, retry) | |
coordinator_options = coordinator_options or empty_t | |
local keyspace | |
if not coordinator_options.no_keyspace then | |
keyspace = coordinator_options.keyspace or self.keyspace | |
end | |
local peer, err = spawn_peer(host, self.default_port, keyspace, self.peers_opts) | |
if not peer then return nil, err | |
else | |
peer:settimeout(self.timeout_connect) | |
local ok, err, maybe_down = peer:connect() | |
if ok then | |
-- host is healthy | |
if retry then | |
-- node seems healthy after being down, back up! | |
local ok, err = set_peer_up(self, host) | |
if not ok then return nil, 'error setting host back up: '..err end | |
end | |
peer:settimeout(self.timeout_read) | |
return peer | |
elseif maybe_down then | |
-- host is not (or still not) responsive | |
local ok, shm_err = set_peer_down(self, host) | |
if not ok then return nil, 'error setting host down: '..shm_err end | |
return nil, 'host seems unhealthy, considering it down ('..err..')' | |
else | |
return nil, err | |
end | |
end | |
end | |
----------- | |
-- Cluster | |
----------- | |
local _Cluster = { | |
_VERSION = '1.3.2', | |
} | |
_Cluster.__index = _Cluster | |
--- New cluster options. | |
-- Options taken by `new` upon cluster creation. | |
-- @field shm Name of the lua_shared_dict to use for this cluster's | |
-- information. (`string`, default: `cassandra`) | |
-- @field contact_points Array of addresses for this cluster's | |
-- contact points. (`table`, default: `{"127.0.0.1"}`) | |
-- @field default_port The port on which all nodes from the cluster are | |
-- listening on. (`number`, default: `9042`) | |
-- @field keyspace Keyspace to use for this cluster. (`string`, optional) | |
-- @field timeout_connect The timeout value when connecing to a node, in ms. | |
-- (`number`, default: `1000`) | |
-- @field timeout_read The timeout value when reading from a node, in ms. | |
-- (`number`, default: `2000`) | |
-- @field retry_on_timeout Specifies if the request should be retried on the | |
-- next coordinator (as per the load balancing policy) | |
-- if it timed out. (`boolean`, default: `true`) | |
-- @field max_schema_consensus_wait Maximum waiting time allowed when executing | |
-- DDL queries before timing out, in ms. | |
-- (`number`, default: `10000`) | |
-- @field lock_timeout Timeout value of lua-resty-lock used for the `refresh` | |
-- and prepared statement mutexes, in seconds. | |
-- (`number`, optional) | |
-- @field silent Disables all logging (of any log_level) from this cluster. | |
-- (`boolean`, default: `false`) | |
-- @field lb_policy A load balancing policy created from one of the modules | |
-- under `resty.cassandra.policies.lb.*`. | |
-- (`lb policy`, default: `lb.rr` round robin) | |
-- @field reconn_policy A reconnection policy created from one of the modules | |
-- under `resty.cassandra.policies.reconnection.*`. | |
-- (`reconn policy`, default: `reconnection.exp` (exponential) | |
-- 1000ms base, 60000ms max) | |
-- @field retry_policy A retry policy created from one of the modules | |
-- under `resty.cassandra.policies.retry.*`. | |
-- (`retry policy`, default: `retry.simple`, 3 retries) | |
-- @field ssl Determines if the created cluster should connect using SSL. | |
-- (`boolean`, default: `false`) | |
-- @field verify Enable server certificate validation if `ssl` is enabled. | |
-- (`boolean`, default: `false`) | |
-- @field auth Authentication handler, created from the | |
-- `cassandra.auth_providers` table. (optional) | |
-- @table `cluster_options` | |
--- Create a new Cluster client. | |
-- Takes a table of `cluster_options`. Does not connect automatically. | |
-- On the first request to the cluster, the module will attempt to connect to | |
-- one of the specified `contact_points`, and retrieve the full list of nodes | |
-- belonging to this cluster. Once this list retrieved, the load balancing | |
-- policy will start selecting nodes to act as coordinators for the future | |
-- requests. | |
-- | |
-- @usage | |
-- local Cluster = require "resty.cassandra.cluster" | |
-- local cluster = Cluster.new { | |
-- shm = "cassandra_shared_dict", | |
-- contact_points = {"10.0.0.1", "10.0.0.2"}, | |
-- keyspace = "my_keyspace", | |
-- default_port = 9042, | |
-- timeout_connect = 3000 | |
-- } | |
-- | |
-- @param[type=table] opts Options for the created cluster client. | |
-- @treturn table `cluster`: A table holding clustering operations capabilities | |
-- or nil if failure. | |
-- @treturn string `err`: String describing the error if failure. | |
function _Cluster.new(opts) | |
opts = opts or empty_t | |
if type(opts) ~= 'table' then | |
return nil, 'opts must be a table' | |
end | |
local peers_opts = {} | |
local lock_opts = {} | |
local dict_name = opts.shm or 'cassandra' | |
if type(dict_name) ~= 'string' then | |
return nil, 'shm must be a string' | |
elseif not shared[dict_name] then | |
return nil, 'no shared dict '..dict_name | |
end | |
for k, v in pairs(opts) do | |
if k == 'keyspace' then | |
if type(v) ~= 'string' then | |
return nil, 'keyspace must be a string' | |
end | |
elseif k == 'ssl' then | |
if type(v) ~= 'boolean' then | |
return nil, 'ssl must be a boolean' | |
end | |
peers_opts.ssl = v | |
elseif k == 'verify' then | |
if type(v) ~= 'boolean' then | |
return nil, 'verify must be a boolean' | |
end | |
peers_opts.verify = v | |
elseif k == 'cafile' then | |
if type(v) ~= 'string' then | |
return nil, 'cafile must be a string' | |
end | |
peers_opts.cafile = v | |
elseif k == 'auth' then | |
if type(v) ~= 'table' then | |
return nil, 'auth seems not to be an auth provider' | |
end | |
peers_opts.auth = v | |
elseif k == 'default_port' then | |
if type(v) ~= 'number' then | |
return nil, 'default_port must be a number' | |
end | |
elseif k == 'contact_points' then | |
if type(v) ~= 'table' then | |
return nil, 'contact_points must be a table' | |
end | |
elseif k == 'timeout_read' then | |
if type(v) ~= 'number' then | |
return nil, 'timeout_read must be a number' | |
end | |
elseif k == 'timeout_connect' then | |
if type(v) ~= 'number' then | |
return nil, 'timeout_connect must be a number' | |
end | |
elseif k == 'max_schema_consensus_wait' then | |
if type(v) ~= 'number' then | |
return nil, 'max_schema_consensus_wait must be a number' | |
end | |
elseif k == 'retry_on_timeout' then | |
if type(v) ~= 'boolean' then | |
return nil, 'retry_on_timeout must be a boolean' | |
end | |
elseif k == 'lock_timeout' then | |
if type(v) ~= 'number' then | |
return nil, 'lock_timeout must be a number' | |
end | |
lock_opts.timeout = v | |
elseif k == 'silent' then | |
if type(v) ~= 'boolean' then | |
return nil, 'silent must be a boolean' | |
end | |
end | |
end | |
return setmetatable({ | |
shm = shared[dict_name], | |
dict_name = dict_name, | |
prepared_ids = {}, | |
peers_opts = peers_opts, | |
keyspace = opts.keyspace, | |
default_port = opts.default_port or 9042, | |
contact_points = opts.contact_points or {'127.0.0.1'}, | |
timeout_read = opts.timeout_read or 2000, | |
timeout_connect = opts.timeout_connect or 1000, | |
retry_on_timeout = opts.retry_on_timeout == nil and true or opts.retry_on_timeout, | |
max_schema_consensus_wait = opts.max_schema_consensus_wait or 10000, | |
lock_opts = lock_opts, | |
logging = not opts.silent, | |
lb_policy = opts.lb_policy | |
or require('resty.cassandra.policies.lb.rr').new(), | |
reconn_policy = opts.reconn_policy | |
or require('resty.cassandra.policies.reconnection.exp').new(1000, 60000), | |
retry_policy = opts.retry_policy | |
or require('resty.cassandra.policies.retry.simple').new(3), | |
}, _Cluster) | |
end | |
local function no_host_available_error(errors) | |
local buf = {'all hosts tried for query failed'} | |
for address, err in pairs(errors) do | |
buf[#buf+1] = address..': '..err | |
end | |
return concat(buf, '. ') | |
end | |
local function first_coordinator(self) | |
local errors = {} | |
local cp = self.contact_points | |
for i = 1, #cp do | |
local peer, err = check_peer_health(self, cp[i], { | |
no_keyspace = true, | |
}) | |
if not peer then | |
errors[cp[i]] = err | |
else | |
return peer | |
end | |
end | |
return nil, no_host_available_error(errors) | |
end | |
local function next_coordinator(self, coordinator_options) | |
local errors = {} | |
for _, peer_rec in self.lb_policy:iter() do | |
log(DEBUG, _log_prefix, "LB iterator peer in next_coordinator: ", peer_rec.host) | |
end | |
for _, peer_rec in self.lb_policy:iter() do | |
local ok, err, retry = can_try_peer(self, peer_rec.host) | |
if ok then | |
local peer, err = check_peer_health(self, peer_rec.host, coordinator_options, retry) | |
if peer then | |
if self.logging then | |
log(DEBUG, _log_prefix, 'load balancing policy chose host at ', peer.host) | |
end | |
return peer | |
else | |
errors[peer_rec.host] = err | |
end | |
elseif err then | |
return nil, err | |
else | |
errors[peer_rec.host] = 'host still considered down' | |
end | |
end | |
return nil, no_host_available_error(errors) | |
end | |
--- Refresh the list of nodes in the cluster. | |
-- Queries one of the specified `contact_points` to retrieve the list of | |
-- available nodes in the cluster, and update the configured policies. | |
-- This method is automatically called upon the first query made to the | |
-- cluster (from `execute`, `batch` or `iterate`), but needs to be manually | |
-- called if further updates are required. | |
-- @treturn boolean `ok`: `true` if success, `nil` if failure. | |
-- @treturn string `err`: String describing the error if failure. | |
function _Cluster:refresh() | |
log(DEBUG, _log_prefix, "REFRESH ENTERED") | |
local old_peers, err = get_peers(self) | |
if err then return nil, err | |
elseif old_peers then | |
-- we first need to flush the existing peers from the shm, | |
-- so that our lock can work properly. we keep old peers in | |
-- our local for later. | |
for i = 1, #old_peers do | |
local host = old_peers[i].host | |
old_peers[host] = old_peers[i] -- alias as a hash | |
self.shm:delete(_rec_key .. host) -- details | |
self.shm:delete(host) -- status bool | |
end | |
else | |
old_peers = {} -- empty shm | |
end | |
local lock = resty_lock:new(self.dict_name, self.lock_opts) | |
local elapsed, err = lock:lock('refresh') | |
if not elapsed then return nil, 'failed to acquire refresh lock: '..err end | |
-- did someone else got the hosts? | |
local peers, err = get_peers(self) | |
if err then return nil, err | |
elseif not peers then | |
-- we are the first ones to get there | |
local coordinator, err = first_coordinator(self) | |
if not coordinator then return nil, err end | |
local local_rows, err = coordinator:execute [[ | |
SELECT data_center,listen_address,release_version FROM system.local | |
]] | |
if not local_rows then return nil, err end | |
assert(local_rows[1] ~= nil, 'local host could not be found') | |
local rows, err = coordinator:execute [[ | |
SELECT peer,data_center,rpc_address,release_version FROM system.peers | |
]] | |
if not rows then return nil, err end | |
coordinator:setkeepalive() | |
rows[#rows+1] = { -- local host | |
rpc_address = local_rows[1].listen_address, | |
data_center = local_rows[1].data_center, | |
release_version = local_rows[1].release_version | |
} | |
for i = 1, #rows do | |
local row = rows[i] | |
local host = row.rpc_address | |
log(DEBUG, _log_prefix, "refresh processing host", host) | |
if not host then | |
log(ERR, _log_prefix, 'no rpc_address found for host ', row.peer, | |
' in ', coordinator.host, '\'s peers system ', | |
'table. ', row.peer, ' will be ignored.') | |
else | |
if host == _bind_all_address then | |
log(WARN, _log_prefix, 'found host with 0.0.0.0 as rpc_address, ', | |
'using listen_address ', row.peer, ' to ', | |
'contact it instead. If this is ', | |
'incorrect you should avoid using 0.0.0.0 ', | |
'server-side.') | |
host = row.peer | |
end | |
local old_peer = old_peers[host] | |
local reconn_delay, unhealthy_at = 0, 0 | |
local up = true | |
if old_peer then | |
up = old_peer.up | |
reconn_delay = old_peer.reconn_delay | |
unhealthy_at = old_peer.unhealthy_at | |
end | |
local ok, err = set_peer(self, host, up, reconn_delay, unhealthy_at, | |
rows[i].data_center, rows[i].release_version) | |
if not ok then return nil, err end | |
end | |
end | |
peers, err = get_peers(self) | |
if err then return nil, err end | |
local ok, err = self.shm:safe_set(_protocol_version_key, coordinator.protocol_version) | |
if not ok then return nil, 'could not set protocol_version in shm: '..err end | |
end | |
local ok, err = lock:unlock() | |
if not ok then return nil, 'failed to unlock refresh lock: '..err end | |
local protocol_version, err = self.shm:get(_protocol_version_key) | |
if not protocol_version then return nil, 'could not get protocol_version: '..err end | |
-- setting protocol_version early so we don't always attempt a connection | |
-- with an incompatible one, triggerring more round trips | |
self.peers_opts.protocol_version = protocol_version | |
--if self.init then | |
-- for _, peer_rec in self.lb_policy:iter() do | |
-- log(DEBUG, _log_prefix, "LB iterator peer prior to post-refresh init: ", peer_rec.host) | |
-- end | |
--end | |
-- initiate the load balancing policy | |
self.lb_policy:init(peers) | |
for i=1, #self.lb_policy.peers do | |
log(DEBUG, _log_prefix, "LB peer array member following refresh: ", self.lb_policy.peers[i].host) | |
end | |
for _, peer_rec in self.lb_policy:iter() do | |
log(DEBUG, _log_prefix, "LB iterator peer following post-refresh init: ", peer_rec.host) | |
end | |
-- cluster is ready to be queried | |
self.init = true | |
-- local ok, err = self.shm:safe_set(_init_key, true) | |
-- if not ok then | |
-- return nil, 'could not set init details in shm: '..err | |
-- end | |
return true | |
end | |
-- function _Cluster:mark_unready() | |
-- local ok, err = self.shm:safe_set(_init_key, false) | |
-- if not ok then | |
-- return nil, 'could not set init details in shm: '..err | |
-- end | |
-- log(DEBUG, _log_prefix, 'marked unready, shm init value is ', self.shm:get(_init_key)) | |
-- | |
-- return true | |
-- end | |
-------------------- | |
-- queries execution | |
-------------------- | |
local function check_schema_consensus(coordinator) | |
local local_res, err = coordinator:execute('SELECT schema_version FROM system.local') | |
if not local_res then return nil, err end | |
local peers_res, err = coordinator:execute('SELECT schema_version FROM system.peers') | |
if not peers_res then return nil, err end | |
if #peers_res > 0 and #local_res > 0 then | |
for i = 1, #peers_res do | |
if peers_res[i].schema_version ~= local_res[1].schema_version then | |
return nil | |
end | |
end | |
end | |
return local_res[1].schema_version | |
end | |
local function wait_schema_consensus(self, coordinator, timeout) | |
timeout = timeout or self.max_schema_consensus_wait | |
local peers, err = get_peers(self) | |
if err then return nil, err | |
elseif not peers then return nil, 'no peers in shm' | |
elseif #peers < 2 then return true end | |
update_time() | |
local ok, err, tdiff | |
local tstart = get_now() | |
repeat | |
-- disabled because this method is currently used outside of an | |
-- ngx_lua compatible context by production applications. | |
-- no fallback implemented yet. | |
--ngx.sleep(0.5) | |
update_time() | |
ok, err = check_schema_consensus(coordinator) | |
tdiff = get_now() - tstart | |
until ok or err or tdiff >= timeout | |
if ok then | |
return ok | |
elseif err then | |
return nil, err | |
else | |
return nil, 'timeout' | |
end | |
end | |
local function prepare(self, coordinator, query) | |
if self.logging then | |
log(DEBUG, _log_prefix, 'preparing ', query, ' on host ', coordinator.host) | |
end | |
-- we are the ones preparing the query | |
local res, err = coordinator:prepare(query) | |
if not res then return nil, 'could not prepare query: '..err end | |
return res.query_id | |
end | |
local function get_or_prepare(self, coordinator, query) | |
-- worker memory check | |
local query_id = self.prepared_ids[query] | |
if not query_id then | |
-- worker cache miss | |
-- shm cache? | |
local shm = self.shm | |
local key = _prepared_key .. query | |
local err | |
query_id, err = shm:get(key) | |
if err then return nil, 'could not get query id from shm:'..err | |
elseif not query_id then | |
-- shm cache miss | |
-- query not prepared yet, must prepare in mutex | |
local lock = resty_lock:new(self.dict_name, self.lock_opts) | |
local elapsed, err = lock:lock('prepare:' .. query) | |
if not elapsed then return nil, 'failed to acquire lock: '..err end | |
-- someone else prepared query? | |
query_id, err = shm:get(key) | |
if err then return nil, 'could not get query id from shm:'..err | |
elseif not query_id then | |
query_id, err = prepare(self, coordinator, query) | |
if not query_id then return nil, err end | |
local ok, err = shm:safe_set(key, query_id) | |
if not ok then | |
if err == 'no memory' then | |
log(WARN, _log_prefix, 'could not set query id in shm: ', | |
'running out of memory, please increase the ', | |
self.dict_name, ' dict size') | |
else | |
return nil, 'could not set query id in shm: '..err end | |
end | |
end | |
local ok, err = lock:unlock() | |
if not ok then return nil, 'failed to unlock: '..err end | |
end | |
-- set worker cache | |
self.prepared_ids[query] = query_id | |
end | |
return query_id | |
end | |
local send_request | |
function _Cluster:send_retry(request, ...) | |
local coordinator, err = next_coordinator(self) | |
if not coordinator then return nil, err end | |
if self.logging then | |
log(NOTICE, _log_prefix, 'retrying request on host at ', coordinator.host, | |
' reason: ', ...) | |
end | |
request.retries = request.retries + 1 | |
return send_request(self, coordinator, request) | |
end | |
local function prepare_and_retry(self, coordinator, request) | |
if request.queries then | |
-- prepared batch | |
if self.logging then | |
log(NOTICE, _log_prefix, 'some requests from this batch were not prepared on host ', | |
coordinator.host, ', preparing and retrying') | |
end | |
for i = 1, #request.queries do | |
local query_id, err = prepare(self, coordinator, request.queries[i][1]) | |
if not query_id then return nil, err end | |
request.queries[i][3] = query_id | |
end | |
else | |
-- prepared query | |
if self.logging then | |
log(NOTICE, _log_prefix, request.query, ' was not prepared on host ', | |
coordinator.host, ', preparing and retrying') | |
end | |
local query_id, err = prepare(self, coordinator, request.query) | |
if not query_id then return nil, err end | |
request.query_id = query_id | |
end | |
return send_request(self, coordinator, request) | |
end | |
local function handle_error(self, err, cql_code, coordinator, request) | |
if cql_code and cql_code == cql_errors.UNPREPARED then | |
return prepare_and_retry(self, coordinator, request) | |
end | |
-- failure, need to try another coordinator | |
coordinator:setkeepalive() | |
if cql_code then | |
local retry | |
if cql_code == cql_errors.OVERLOADED or | |
cql_code == cql_errors.IS_BOOTSTRAPPING or | |
cql_code == cql_errors.TRUNCATE_ERROR then | |
retry = true | |
elseif cql_code == cql_errors.UNAVAILABLE_EXCEPTION then | |
retry = self.retry_policy:on_unavailable(request) | |
elseif cql_code == cql_errors.READ_TIMEOUT then | |
retry = self.retry_policy:on_read_timeout(request) | |
elseif cql_code == cql_errors.WRITE_TIMEOUT then | |
retry = self.retry_policy:on_write_timeout(request) | |
end | |
if retry then | |
return self:send_retry(request, 'CQL code: ', cql_code) | |
end | |
elseif err == 'timeout' then | |
if self.retry_on_timeout then | |
return self:send_retry(request, 'timeout') | |
end | |
else | |
-- host seems down? | |
local ok, err = set_peer_down(self, coordinator.host) | |
if not ok then return nil, err end | |
return self:send_retry(request, 'coordinator seems down') | |
end | |
return nil, err, cql_code | |
end | |
send_request = function(self, coordinator, request) | |
local res, err, cql_code = coordinator:send(request) | |
if not res then | |
return handle_error(self, err, cql_code, coordinator, request) | |
elseif res.warnings and self.logging then | |
-- protocol v4 can return warnings to the client | |
for i = 1, #res.warnings do | |
log(WARN, _log_prefix, res.warnings[i]) | |
end | |
end | |
if res.type == 'SCHEMA_CHANGE' then | |
local schema_version, err = wait_schema_consensus(self, coordinator) | |
if not schema_version then | |
coordinator:setkeepalive() | |
return nil, 'could not check schema consensus: '..err | |
end | |
res.schema_version = schema_version | |
end | |
coordinator:setkeepalive() | |
return res | |
end | |
do | |
local get_request_opts = cassandra.get_request_opts | |
local page_iterator = cassandra.page_iterator | |
local query_req = requests.query.new | |
local batch_req = requests.batch.new | |
local prep_req = requests.execute_prepared.new | |
--- Coordinator options. | |
-- Options to pass to coordinators chosen by the load balancing policy | |
-- on `execute`/`batch`/`iterate`. | |
-- @field keyspace Keyspace to use for the current request connection. | |
-- (`string`, optional) | |
-- @field no_keyspace Does not set a keyspace for the current request | |
-- connection. | |
-- (`boolean`, default: `false`) | |
-- @table `coordinator_options` | |
--- Execute a query. | |
-- Sends a request to the coordinator chosen by the configured load | |
-- balancing policy. The policy always chooses nodes that are considered | |
-- healthy, and eventually reconnects to unhealthy nodes as per the | |
-- configured reconnection policy. | |
-- Requests that fail because of timeouts can be retried on the next | |
-- available node if `retry_on_timeout` is enabled, and failed requests | |
-- can be retried as per defined in the configured retry policy. | |
-- | |
-- @usage | |
-- local Cluster = require "resty.cassandra.cluster" | |
-- local cluster, err = Cluster.new() | |
-- if not cluster then | |
-- ngx.log(ngx.ERR, "could not create cluster: ", err) | |
-- ngx.exit(500) | |
-- end | |
-- | |
-- local rows, err = cluster:execute("SELECT * FROM users WHERE age = ?". { | |
-- 21 | |
-- }, { | |
-- page_size = 100 | |
-- }) | |
-- if not rows then | |
-- ngx.log(ngx.ERR, "could not retrieve users: ", err) | |
-- ngx.exit(500) | |
-- end | |
-- | |
-- ngx.say("page size: ", #rows, " next page: ", rows.meta.paging_state) | |
-- | |
-- @param[type=string] query CQL query to execute. | |
-- @param[type=table] args (optional) Arguments to bind to the query. | |
-- @param[type=table] options (optional) Options from `query_options`. | |
-- @param[type=table] coordinator_options (optional) Options from `coordinator_options` | |
-- for this query. | |
-- @treturn table `res`: Table holding the query result if success, `nil` if failure. | |
-- @treturn string `err`: String describing the error if failure. | |
-- @treturn number `cql_err`: If a server-side error occurred, the CQL error code. | |
function _Cluster:execute(query, args, options, coordinator_options) | |
-- local init, err = self.shm:get(_init_key) | |
-- log(DEBUG, _log_prefix, 'executing: init shm value is ', init) | |
if not self.init then | |
log(DEBUG, _log_prefix, 'not initialized, refreshing', init) | |
local ok, err = self:refresh() | |
if not ok then return nil, 'could not refresh cluster: '..err end | |
end | |
coordinator_options = coordinator_options or empty_t | |
local coordinator, err = next_coordinator(self, coordinator_options) | |
if not coordinator then return nil, err end | |
local request | |
local opts = get_request_opts(options) | |
if opts.prepared then | |
local query_id, err = get_or_prepare(self, coordinator, query) | |
if not query_id then return nil, err end | |
request = prep_req(query_id, args, opts, query) | |
else | |
request = query_req(query, args, opts) | |
end | |
return send_request(self, coordinator, request) | |
end | |
--- Execute a batch. | |
-- Sends a request to execute the given batch. Load balancing, reconnection, | |
-- and retry policies act the same as described for `execute`. | |
-- @usage | |
-- local Cluster = require "resty.cassandra.cluster" | |
-- local cluster, err = Cluster.new() | |
-- if not cluster then | |
-- ngx.log(ngx.ERR, "could not create cluster: ", err) | |
-- ngx.exit(500) | |
-- end | |
-- | |
-- local res, err = cluster:batch({ | |
-- {"INSERT INTO things(id, n) VALUES(?, 1)", {123}}, | |
-- {"UPDATE things SET n = 2 WHERE id = ?", {123}}, | |
-- {"UPDATE things SET n = 3 WHERE id = ?", {123}} | |
-- }, { | |
-- logged = false | |
-- }) | |
-- if not res then | |
-- ngx.log(ngx.ERR, "could not execute batch: ", err) | |
-- ngx.exit(500) | |
-- end | |
-- | |
-- @param[type=table] queries CQL queries to execute. | |
-- @param[type=table] options (optional) Options from `query_options`. | |
-- @param[type=table] coordinator_options (optional) Options from `coordinator_options` | |
-- for this query. | |
-- @treturn table `res`: Table holding the query result if success, `nil` if failure. | |
-- @treturn string `err`: String describing the error if failure. | |
-- @treturn number `cql_err`: If a server-side error occurred, the CQL error code. | |
function _Cluster:batch(queries, options, coordinator_options) | |
local init, err = self.shm:get(_init_key) | |
if not init then | |
local ok, err = self:refresh() | |
if not ok then return nil, 'could not refresh cluster: '..err end | |
end | |
coordinator_options = coordinator_options or empty_t | |
local coordinator, err = next_coordinator(self, coordinator_options) | |
if not coordinator then return nil, err end | |
local opts = get_request_opts(options) | |
if opts.prepared then | |
for i = 1, #queries do | |
local query_id, err = get_or_prepare(self, coordinator, queries[i][1]) | |
if not query_id then return nil, err end | |
queries[i][3] = query_id | |
end | |
end | |
return send_request(self, coordinator, batch_req(queries, opts)) | |
end | |
--- Lua iterator for auto-pagination. | |
-- Perform auto-pagination for a query when used as a Lua iterator. | |
-- Load balancing, reconnection, and retry policies act the same as described | |
-- for `execute`. | |
-- | |
-- @usage | |
-- local Cluster = require "resty.cassandra.cluster" | |
-- local cluster, err = Cluster.new() | |
-- if not cluster then | |
-- ngx.log(ngx.ERR, "could not create cluster: ", err) | |
-- ngx.exit(500) | |
-- end | |
-- | |
-- for rows, err, page in cluster:iterate("SELECT * FROM users") do | |
-- if err then | |
-- ngx.log(ngx.ERR, "could not retrieve page: ", err) | |
-- ngx.exit(500) | |
-- end | |
-- ngx.say("page ", page, " has ", #rows, " rows") | |
-- end | |
-- | |
-- @param[type=string] query CQL query to execute. | |
-- @param[type=table] args (optional) Arguments to bind to the query. | |
-- @param[type=table] options (optional) Options from `query_options` | |
-- for this query. | |
function _Cluster:iterate(query, args, options) | |
return page_iterator(self, query, args, options) | |
end | |
end | |
_Cluster.set_peer = set_peer | |
_Cluster.get_peer = get_peer | |
_Cluster.add_peer = add_peer | |
_Cluster.get_peers = get_peers | |
_Cluster.delete_peer = delete_peer | |
_Cluster.set_peer_up = set_peer_up | |
_Cluster.can_try_peer = can_try_peer | |
_Cluster.handle_error = handle_error | |
_Cluster.set_peer_down = set_peer_down | |
_Cluster.get_or_prepare = get_or_prepare | |
_Cluster.next_coordinator = next_coordinator | |
_Cluster.first_coordinator = first_coordinator | |
_Cluster.wait_schema_consensus = wait_schema_consensus | |
_Cluster.check_schema_consensus = check_schema_consensus | |
return _Cluster |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment