Created
March 10, 2015 15:34
-
-
Save dvv/24041b788dcb7c8bb048 to your computer and use it in GitHub Desktop.
Nginx redis comet
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
------------------------------------------------------------------------------ | |
-- Redis publication | |
-- | |
-- LICENCE: http://opensource.org/licenses/MIT | |
-- Vladimir Dronnikov <dronnikov@gmail.com> | |
------------------------------------------------------------------------------ | |
local db = assert(require("resty.redis"):new()) | |
db:set_timeout(1000) -- 1000 sec | |
assert(db:connect("127.0.0.1", 6379)) | |
db:publish("input", ngx.var.arg_input) | |
ngx.say("") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# GET /pub?input=<topic>=<payload> -> publish input <topic>=<payload> | |
# | |
location /pub { | |
default_type text/plain; | |
#lua_code_cache off; | |
content_by_lua_file /scripts/pub.lua; | |
} | |
# | |
# GET /sub -> EventSource: data: <topic>=<payload> | |
# | |
location /sub { | |
lua_socket_log_errors off; | |
lua_check_client_abort on; | |
#lua_code_cache off; | |
content_by_lua_file /scripts/sub.lua; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
------------------------------------------------------------------------------ | |
-- EventSource from redis subscriptions | |
-- | |
-- LICENCE: http://opensource.org/licenses/MIT | |
-- Vladimir Dronnikov <dronnikov@gmail.com> | |
------------------------------------------------------------------------------ | |
-- cache | |
local table = table | |
local db = assert(require("resty.redis"):new()) | |
db:set_timeout(1000000) -- 1000 sec | |
assert(db:connect("127.0.0.1", 6379)) | |
local function format_chunk(message) -- for chunked transfer-encoding | |
return ("%x\r\n%s\r\n"):format(#message, message) | |
end | |
local sock | |
local function push(k, v) | |
-- NB: we escape % \r \n \x00 | |
-- FIXME: \0 is not allowed?! | |
v = v:gsub("[%%\r\n%z]", function(c) return ("%%%02x"):format(c:byte(1)) end) | |
--local s = format_chunk(("event:notify\r\ndata:%s=%s\r\n\r\n"):format(k, v)) | |
local s = format_chunk(("data: %s=%s\r\n\r\n"):format(k, v)) | |
local n, err = sock:send(s) | |
return n == #s | |
end | |
sock = ngx.req.socket(true) | |
local function cleanup() | |
db:close() | |
ngx.exit(499) | |
end | |
ngx.on_abort(cleanup) | |
sock:settimeout(86400000) | |
-- send headers | |
sock:send(table.concat({ | |
"HTTP/1.1 200 OK", | |
"Content-Type: text/event-stream", | |
"Connection: keep-alive", | |
"Transfer-Encoding: chunked", | |
"", | |
"", -- NB: Opera needs extra CRLF | |
}, "\r\n")) | |
-- push current state | |
local state, err = db:hgetall("state") | |
if not state then | |
ngx.exit(500) | |
return | |
end | |
for i = 1, #state / 2 do | |
push(state[2*i-1], state[2*i]) | |
end | |
-- push further events | |
local ok, err = db:psubscribe("*") | |
if not ok then | |
ngx.exit(500) | |
return | |
end | |
while true do | |
local msg, err = db:read_reply() | |
if not msg then | |
-- error handling | |
break | |
end | |
if not push(msg[3], msg[4]) then | |
break | |
end | |
end | |
sock:send(format_chunk("")) | |
sock:close() | |
cleanup() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment