Skip to content

Instantly share code, notes, and snippets.

@sassembla
Created July 29, 2015 02:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sassembla/f775afb66c65a0b49e10 to your computer and use it in GitHub Desktop.
Save sassembla/f775afb66c65a0b49e10 to your computer and use it in GitHub Desktop.
-- Disque client for lua.
local sub = string.sub
local byte = string.byte
local tcp = ngx.socket.tcp
local concat = table.concat
local null = ngx.null
local pairs = pairs
local unpack = unpack
local setmetatable = setmetatable
local tonumber = tonumber
local error = error
local ok, new_tab = pcall(require, "table.new")
if not ok or type(new_tab) ~= "function" then
new_tab = function (narr, nrec) return {} end
end
local _M = new_tab(0, 155)
_M._VERSION = '0.1'
local commands = {
"addjob", "getjob", "fastack"
}
local mt = { __index = _M }
function _M.new(self)
local sock, err = tcp()
if not sock then
return nil, err
end
return setmetatable({ sock = sock }, mt)
end
function _M.set_timeout(self, timeout)
local sock = self.sock
if not sock then
return nil, "not initialized"
end
return sock:settimeout(timeout)
end
function _M.connect(self, ...)
local sock = self.sock
if not sock then
return nil, "not initialized"
end
self.subscribed = nil
return sock:connect(...)
end
function _M.set_keepalive(self, ...)
local sock = self.sock
if not sock then
return nil, "not initialized"
end
if self.subscribed then
return nil, "subscribed state"
end
return sock:setkeepalive(...)
end
function _M.get_reused_times(self)
local sock = self.sock
if not sock then
return nil, "not initialized"
end
return sock:getreusedtimes()
end
local function close(self)
local sock = self.sock
if not sock then
return nil, "not initialized"
end
return sock:close()
end
_M.close = close
local function _read_reply(self, sock)
local line, err = sock:receive()
if not line then
if err == "timeout" and not self.subscribed then
sock:close()
end
return nil, err
end
local prefix = byte(line)
if prefix == 36 then -- char '$'
-- print("bulk reply")
local size = tonumber(sub(line, 2))
if size < 0 then
return null
end
local data, err = sock:receive(size)
if not data then
if err == "timeout" then
sock:close()
end
return nil, err
end
local dummy, err = sock:receive(2) -- ignore CRLF
if not dummy then
return nil, err
end
return data
elseif prefix == 43 then -- char '+'
-- ngx.log(ngx.ERR, "status reply line:", line)
return sub(line, 2)
elseif prefix == 42 then -- char '*'
local n = tonumber(sub(line, 2))
-- ngx.log(ngx.ERR, "multi-bulk reply: ", n)
if n < 0 then
return null
end
local vals = new_tab(n, 0);
local nvals = 0
for i = 1, n do
local res, err = _read_reply(self, sock)
if res then
nvals = nvals + 1
vals[nvals] = res
elseif res == nil then
return nil, err
else
-- be a valid redis error value
nvals = nvals + 1
vals[nvals] = {false, err}
end
end
return vals
elseif prefix == 58 then -- char ':'
-- ngx.log(ngx.ERR, "integer reply")
return tonumber(sub(line, 2))
elseif prefix == 45 then -- char '-'
-- ngx.log(ngx.ERR, "error reply: ", n)
return false, sub(line, 2)
else
return nil, "unkown prefix: \"" .. prefix .. "\""
end
end
local function _gen_req(args)
local nargs = #args
local req = new_tab(nargs + 1, 0)
req[1] = "*" .. nargs .. "\r\n"
local nbits = 1
for i = 1, nargs do
local arg = args[i]
nbits = nbits + 1
if not arg then
req[nbits] = "$-1\r\n"
else
if type(arg) ~= "string" then
arg = tostring(arg)
end
req[nbits] = "$" .. #arg .. "\r\n" .. arg .. "\r\n"
end
end
-- it is faster to do string concatenation on the Lua land
return concat(req)
end
local function _do_cmd(self, ...)
local args = {...}
local sock = self.sock
if not sock then
return nil, "not initialized"
end
local req = _gen_req(args)
local reqs = self._reqs
if reqs then
reqs[#reqs + 1] = req
return
end
-- ngx.log(ngx.ERR, "request:", req)
local bytes, err = sock:send(req)
if not bytes then
return nil, err
end
-- ngx.log(ngx.ERR, "result bytes:", bytes, "err:", err)
return _read_reply(self, sock)
end
local function _check_subscribed(self, res)
if type(res) == "table"
and (res[1] == "unsubscribe" or res[1] == "punsubscribe")
and res[3] == 0
then
self.subscribed = nil
end
end
function _M.read_reply(self)
local sock = self.sock
if not sock then
return nil, "not initialized"
end
if not self.subscribed then
return nil, "not subscribed"
end
local res, err = _read_reply(self, sock)
_check_subscribed(self, res)
return res, err
end
for i = 1, #commands do
local cmd = commands[i]
_M[cmd] =
function (self, ...)
return _do_cmd(self, cmd, ...)
end
end
function _M.hmset(self, hashname, ...)
local args = {...}
if #args == 1 then
local t = args[1]
local n = 0
for k, v in pairs(t) do
n = n + 2
end
local array = new_tab(n, 0)
local i = 0
for k, v in pairs(t) do
array[i + 1] = k
array[i + 2] = v
i = i + 2
end
-- print("key", hashname)
return _do_cmd(self, "hmset", hashname, unpack(array))
end
-- backwards compatibility
return _do_cmd(self, "hmset", hashname, ...)
end
function _M.init_pipeline(self, n)
self._reqs = new_tab(n or 4, 0)
end
function _M.cancel_pipeline(self)
self._reqs = nil
end
function _M.commit_pipeline(self)
local reqs = self._reqs
if not reqs then
return nil, "no pipeline"
end
self._reqs = nil
local sock = self.sock
if not sock then
return nil, "not initialized"
end
local bytes, err = sock:send(reqs)
if not bytes then
return nil, err
end
local nvals = 0
local nreqs = #reqs
local vals = new_tab(nreqs, 0)
for i = 1, nreqs do
local res, err = _read_reply(self, sock)
if res then
nvals = nvals + 1
vals[nvals] = res
elseif res == nil then
if err == "timeout" then
close(self)
end
return nil, err
else
-- be a valid redis error value
nvals = nvals + 1
vals[nvals] = {false, err}
end
end
return vals
end
function _M.array_to_hash(self, t)
local n = #t
-- print("n = ", n)
local h = new_tab(0, n / 2)
for i = 1, n, 2 do
h[t[i]] = t[i + 1]
end
return h
end
function _M.add_commands(...)
local cmds = {...}
for i = 1, #cmds do
local cmd = cmds[i]
_M[cmd] =
function (self, ...)
return _do_cmd(self, cmd, ...)
end
end
end
return _M
@sassembla
Copy link
Author

IDENTIFIER_CONTEXT = "tocontext"
-- identifier-client = UUID. e.g. AD112CD4-3A23-4E49-B562-E07A360DD836 len is 36.

STATE_CONNECT = 1
STATE_MESSAGE = 2
STATE_DISCONNECT_1 = 3
STATE_DISCONNECT_2 = 4

ip = "127.0.0.1"
port = 7711

-- entrypoint for WebSocket client connecttion.

-- setup Disque get-add
local disque = require "disque.disque"
local uuid = require "uuid.uuid"

local connectionId = uuid.getUUID()

receiveJobConn = disque:new()
local ok, err = receiveJobConn:connect(ip, port)
if not ok then
ngx.log(ngx.ERR, "connection:", connectionId, " failed to generate receiver client")
return
end

receiveJobConn:set_timeout(1000 * 60 * 60)

addJobCon = disque:new()
local ok, err = addJobCon:connect(ip, port)
if not ok then
ngx.log(ngx.ERR, "connection:", connectionId, " failed to generate addJob client")
return
end

-- setup websocket client
local wsServer = require "ws.websocketServer"

wb, wErr = wsServer:new{
timeout = 10000000,
max_payload_len = 65535
}

if not wb then
ngx.log(ngx.ERR, "connection:", connectionId, " failed to new websocket: ", wErr)
return
end

function split(inputstr, sep)
if sep == nil then
sep = "%s"
end
local t = {}
local i = 1
for str in string.gmatch(inputstr, "([^"..sep.."]+)") do
t[i] = str
i = i + 1
end
return t
end

-- WebSocketの接続、そのメッセージの送付とかを行っている。
function connectWebSocket()
-- start receive
ngx.thread.spawn(receive)

-- send connected
local jsonData = "st"..STATE_CONNECT.."con"..connectionId
-- json:encode({connectionId = connectionId, state = STATE_CONNECT})
addJobCon:addjob(IDENTIFIER_CONTEXT, jsonData, 0)

-- start websocket serving
while true do
    local recv_data, typ, err = wb:recv_frame()

    if wb.fatal then
        local jsonData = "st"..STATE_DISCONNECT_1.."con"..connectionId
        addJobCon:addjob(IDENTIFIER_CONTEXT, jsonData, 0)
        ngx.log(ngx.ERR, "connection:", connectionId, " failed to send ping1: ", err)
        break
    end

    if not recv_data then
        local bytes, err = wb:send_ping()
        if not bytes then
            ngx.log(ngx.ERR, "connection:", connectionId, " failed to send ping2: ", err)
            break
        end
    end

    if typ == "close" then
        local jsonData = "st"..STATE_DISCONNECT_2.."con"..connectionId
        addJobCon:addjob(IDENTIFIER_CONTEXT, jsonData, 0)

        -- start close.
        break
    elseif typ == "ping" then
        local bytes, err = wb:send_pong()
        if not bytes then
            ngx.log(ngx.ERR, "connection:", connectionId, " failed to send pong: ", err)
            break
        end
    elseif typ == "pong" then
        ngx.log(ngx.INFO, "client ponged")

    elseif typ == "text" then
        -- post message to central.
        local jsonData = "st"..STATE_MESSAGE.."con"..connectionId..recv_data
        addJobCon:addjob(IDENTIFIER_CONTEXT, jsonData, 0)
    end
end

wb:send_close()
ngx.log(ngx.ERR, "connection:", connectionId, " connection closed")

end

-- receive loop
-- waiting data from central.
function receive ()
while true do
-- ::continue::

    local res, err = receiveJobConn:getjob("from", connectionId)

    if not res then
        ngx.log(ngx.ERR, "err:", err)
        break
    else
        local datas = res[1]
        -- ngx.log(ngx.ERR, "client datas1:", datas[1])-- connectionId
        -- ngx.log(ngx.ERR, "client datas2:", datas[2])-- messageId
        -- ngx.log(ngx.ERR, "client datas3:", datas[3])-- data
        local messageId = datas[2]
        local sendingData = datas[3]

        -- fastack to disque
        local ackRes, ackErr = receiveJobConn:fastack(messageId)
        if not ackRes then
            ngx.log(ngx.ERR, "disque, ackに失敗したケース connection:", connectionId, " ackErr:", ackErr)
        end
        -- ngx.log(ngx.ERR, "messageId:", messageId, " ackRes:", ackRes)

        -- connection check
        if not wb:is_connecting() then
            ngx.log(ngx.ERR, "disque, 未解決の、クライアント-ws切断時にすべきこと、、切断情報の通知。connection:", connectionId, " Disque unsubscribed by websocket closed.")
            break
        end

        -- send data to client
        local bytes, err = wb:send_text(sendingData)

        if not bytes then
            ngx.log(ngx.ERR, "disque, 未解決の、送付失敗時にすべきこと。 connection:", connectionId, " failed to send text to client. err:", err)
            break
        end
    end
end

end

function contains(tbl, item)
for key, value in pairs(tbl) do
if value == item then return key end
end
return false
end

connectWebSocket()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment