Skip to content

Instantly share code, notes, and snippets.

@dvv
Created March 10, 2015 15:34
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dvv/24041b788dcb7c8bb048 to your computer and use it in GitHub Desktop.
Save dvv/24041b788dcb7c8bb048 to your computer and use it in GitHub Desktop.
Nginx redis comet
------------------------------------------------------------------------------
-- Redis publication
--
-- LICENCE: http://opensource.org/licenses/MIT
-- Vladimir Dronnikov <dronnikov@gmail.com>
------------------------------------------------------------------------------
local db = assert(require("resty.redis"):new())
db:set_timeout(1000) -- 1000 sec
assert(db:connect("127.0.0.1", 6379))
db:publish("input", ngx.var.arg_input)
ngx.say("")
#
# GET /pub?input=<topic>=<payload> -> publish input <topic>=<payload>
#
location /pub {
default_type text/plain;
#lua_code_cache off;
content_by_lua_file /scripts/pub.lua;
}
#
# GET /sub -> EventSource: data: <topic>=<payload>
#
location /sub {
lua_socket_log_errors off;
lua_check_client_abort on;
#lua_code_cache off;
content_by_lua_file /scripts/sub.lua;
}
------------------------------------------------------------------------------
-- EventSource from redis subscriptions
--
-- LICENCE: http://opensource.org/licenses/MIT
-- Vladimir Dronnikov <dronnikov@gmail.com>
------------------------------------------------------------------------------
-- cache
local table = table
local db = assert(require("resty.redis"):new())
db:set_timeout(1000000) -- 1000 sec
assert(db:connect("127.0.0.1", 6379))
local function format_chunk(message) -- for chunked transfer-encoding
return ("%x\r\n%s\r\n"):format(#message, message)
end
local sock
local function push(k, v)
-- NB: we escape % \r \n \x00
-- FIXME: \0 is not allowed?!
v = v:gsub("[%%\r\n%z]", function(c) return ("%%%02x"):format(c:byte(1)) end)
--local s = format_chunk(("event:notify\r\ndata:%s=%s\r\n\r\n"):format(k, v))
local s = format_chunk(("data: %s=%s\r\n\r\n"):format(k, v))
local n, err = sock:send(s)
return n == #s
end
sock = ngx.req.socket(true)
local function cleanup()
db:close()
ngx.exit(499)
end
ngx.on_abort(cleanup)
sock:settimeout(86400000)
-- send headers
sock:send(table.concat({
"HTTP/1.1 200 OK",
"Content-Type: text/event-stream",
"Connection: keep-alive",
"Transfer-Encoding: chunked",
"",
"", -- NB: Opera needs extra CRLF
}, "\r\n"))
-- push current state
local state, err = db:hgetall("state")
if not state then
ngx.exit(500)
return
end
for i = 1, #state / 2 do
push(state[2*i-1], state[2*i])
end
-- push further events
local ok, err = db:psubscribe("*")
if not ok then
ngx.exit(500)
return
end
while true do
local msg, err = db:read_reply()
if not msg then
-- error handling
break
end
if not push(msg[3], msg[4]) then
break
end
end
sock:send(format_chunk(""))
sock:close()
cleanup()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment