Skip to content

Instantly share code, notes, and snippets.

@AMD-NICK
Last active November 23, 2023 04:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save AMD-NICK/77c2252c9800acd83a19a277adca1b75 to your computer and use it in GitHub Desktop.
Save AMD-NICK/77c2252c9800acd83a19a277adca1b75 to your computer and use it in GitHub Desktop.
[LUA] redis query tests (нигде не публиковалось, просто публичная заметка)
-- Используется только luasocket
-- Проверял, можно ли выполнить несколько операций записи и только затем выполнить операции чтения
local auth_buff = {
"*2\r\n",
"$4\r\nAUTH\r\n",
"$14\r\nqwertyuiopasdf\r\n",
}
local host, port = os.getenv("REDIS_HOST"):match("([^:]+):?(%d*)")
local socket = require("socket")
local skt = socket.tcp()
print("settimeout", skt:settimeout(3))
print("connect", skt:connect(host, tonumber(port)))
-- print("setoption", skt:setoption('tcp-nodelay', true)) -- так и не заметил влияния ни на что
print("auth send", skt:send(table.concat(auth_buff)))
print("auth send", skt:send(table.concat(auth_buff)))
print("auth recv", skt:receive("*l"))
print("auth recv", skt:receive("*l"))
-- print("auth recv", skt:receive("*l"))
-- Проверял, с какой скоростью на чистом luasocket получится выполнить 100k операций записи
-- Но не получилось, потому что sock:write не блокирует код и без таймера в конце просто не все команды успевали выполниться
local REDIS = require("misc.redis_query")
function REDIS.getcon()
local host, port = os.getenv("REDIS_HOST"):match("([^:]+):?(%d*)")
return REDIS.create({
host = host,
port = tonumber(port),
password = os.getenv("REDIS_PASS"),
-- copas_wrap = true,
timeout = 5, -- connect timeout
-- tcp_nodelay = false,
max_retries_per_request = 10,
retry_delay = 5,
})
end
local conn = REDIS.getcon()
local client = conn.client
local fp = require("fn").fp -- https://github.com/FPtje/GModFunctional/blob/de2047455b898c821a912eef31c8307878c4f089/fn.lua#L18
local send_command = fp{client.requests.multibulk, client}
local read_respons = fp{client.response.read, client} -- exposed https://github.com/nrk/redis-lua/blob/880dda904909adfed0ad79cdd80317c6dd1a005a/src/redis.lua#L314
send_command("AUTH", "qwertyuiopasdf")
print("auth ok?", read_respons())
send_command("GET", "fuck3")
print("initial_value", read_respons())
local now = require("socket").gettime
local time_start = now()
for i = 1, 100000 do
send_command("INCR", "fuck3")
end
print("time", now() - time_start)
-- print("read", read_respons())
require("socket").sleep(3) -- без этого не все успевает отправиться
-- Redis обертка, позволяющая выполнять запросы асинхронно.
local socket = require("socket")
local redis = require("redis") -- https://github.com/nrk/redis-lua/blob/version-2.0/src/redis.lua
local redq = {}
local MT = {}
function MT.__index(wrapped_client, method_name)
local redis_client = wrapped_client.redis_client
local has_method = redis.commands[method_name]
if has_method then
return function(_, ...)
return redq.run_method_safe(wrapped_client, method_name, ...)
end
else
print("Attempting to access a field instead of the method", method_name)
return redis_client[method_name]
end
end
function redq.run_method_safe(wrapped_client, method_name, ...)
local redis_client = wrapped_client.redis_client
local method_func = redis_client[method_name]
-- wrapped_client.opts.socket:settimeout(1)
-- local args = {...}
-- print("args", wrapped_client.opts.socket, args[1], args[2], args[3], args[4])
local ok, res = pcall(method_func, redis_client, ...)
if ok then
wrapped_client._retry_attemps = 0
return res
end
local opts = wrapped_client.opts
-- print("COMMAND ERROR _retry_attemps, err", wrapped_client._retry_attemps, res)
local want_reconnect = opts.reconnect_condition == nil
or opts.reconnect_condition(res)
if not want_reconnect then error(res) end
local max_retries = opts.max_retries_per_request or 3
local retry_attempt = wrapped_client._retry_attemps
if retry_attempt >= max_retries then
wrapped_client._retry_attemps = 0
error(res)
end
wrapped_client._retry_attemps = retry_attempt + 1
local retry_delay = opts.retry_delay
if not retry_delay then
retry_delay = 1
elseif type(retry_delay) == "function" then
retry_delay = retry_delay(retry_attempt)
end
retry_delay = retry_delay or 3
if opts.copas_wrap then
require("copas").pause(retry_delay) -- only thread (coro)
else
socket.sleep(retry_delay) -- whole process
end
local reconnect_ok, err = pcall(redq.reconnect, redis_client, opts)
print("Redis Wrapper: " .. (reconnect_ok and "reconnected" or "reconnect failed: " .. err))
return redq.run_method_safe(wrapped_client, method_name, ...)
end
function redq.wrap(redis_client, opts)
local wrapped_client = setmetatable({
redis_client = redis_client,
opts = opts,
_retry_attemps = 0,
}, MT)
return wrapped_client
end
function redq.copas_socket(host, port, timeout, tcp_nodelay)
local sock = socket.tcp()
sock = require("copas").wrap(sock)
sock:connect(host, port) -- #todo check for errors?
sock:setoption("tcp-nodelay", tcp_nodelay)
sock:settimeouts(timeout, nil, nil) -- conn, send, recv
return sock
end
function redq.reconnect(redis_client, opts)
local sock = redis_client.network.socket
sock:close()
if opts.copas_wrap then
sock = redq.copas_socket(opts.host, opts.port, opts.timeout, opts.tcp_nodelay)
redis_client.network.socket = sock
else
if opts.socket then error("we can't recreate custom socket") end
local new_redis_client = redis.connect(opts)
redis_client.network.socket = new_redis_client.network.socket
end
if opts.password then redis_client:auth(opts.password) end
if opts.db then redis_client:select(opts.db) end
end
function redq.connect(opts)
if opts.copas_wrap then
opts.socket = redq.copas_socket(opts.host, opts.port, opts.timeout, opts.tcp_nodelay)
end
local redis_client = redis.connect(opts) -- has .network.socket field
if opts.password then redis_client:auth(opts.password) end
if opts.db then redis_client:select(opts.db) end
opts.socket = nil -- we don't need it anymore
return redis_client
end
function redq.create(...)
local opts = {
host = "127.0.0.1",
port = 6379,
-- sock = "redis.sock",
tcp_nodelay = true,
copas_wrap = false,
-- socket = socket.tcp(),
-- timeout = 10, -- connect timeout
-- password = "pass",
-- db = 0,
-- reconnect_condition = function(err) return err:match("timeout") end,
-- max_retries_per_request = 3,
-- retry_delay = function(times) return times * 1 end, -- in seconds
}
local args = {...}
if type(args[1]) == "string" then
opts.sock = args[1]
elseif type(args[1]) == "number" then
opts.port, opts.host = args[1], args[2]
elseif type(args[1]) == "table" then
for k, v in pairs( args[1] ) do -- merge with overrides
opts[k] = v
end
end
local redis_client = redq.connect(opts)
return redq.wrap(redis_client, opts)
end
return redq
-- Читаем с сокета ответ в отдельном потоке, независимо от операции записи
local host, port = os.getenv("REDIS_HOST"):match("([^:]+):?(%d*)")
local copas = require("copas")
copas.loop(function()
local REDIS = require("misc.redis_query")
local conn = REDIS.create({
host = host,
port = tonumber(port),
password = os.getenv("REDIS_PASS"),
copas_wrap = true,
timeout = 5,
-- tcp_nodelay = false,
})
local client = conn.client
local send_command = function(...) return client.requests.multibulk(client, ...) end
local read_respons = function(...) return client.response.read(client) end
-- send_command("AUTH", "qwertyuiopasdf")
-- print("auth ok?", read_respons())
local INCR_KEY = "test15"
send_command("GET", INCR_KEY)
print("initial_value", read_respons())
local callbacks = {}
local need_incr = 100000
local responses_received = 0
local now = require("socket").gettime
local time_start = now()
for i = 1, need_incr do
copas.addnamedthread("i_" .. i, function()
send_command("INCR", INCR_KEY)
table.insert(callbacks, function(val)
responses_received = responses_received + 1
if responses_received % 1000 == 0 then
print("responses_received, val", responses_received, val)
end
if responses_received == need_incr then
print("time", now() - time_start)
end
end)
end)
end
copas.addnamedthread("mainloop", function()
while true do
local ok, res = pcall(read_respons)
if not ok then print("res err", res) end
local cb = table.remove(callbacks, 1)
cb(res)
end
end)
end)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment