Skip to content

Instantly share code, notes, and snippets.

@daurnimator
Last active February 20, 2023 05:56
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save daurnimator/f1c7965b47a5658b88300403645541aa to your computer and use it in GitHub Desktop.
Save daurnimator/f1c7965b47a5658b88300403645541aa to your computer and use it in GitHub Desktop.
Tarantool + cqueues for lua-http
-- This code monkey patches cqueues primitives to allow for transparent use of cqueues inside of tarantool
local cqueues = require "cqueues"
local fiber = require "fiber"
local socket = require "socket" -- https://tarantool.org/en/doc/1.7/reference/reference_lua/socket.html (not luasocket)
local old_step; old_step = cqueues.interpose("step", function(self, timeout)
if cqueues.running() then
fiber.yield()
return old_step(self, timeout)
else
local t = self:timeout() or math.huge
if timeout then
t = math.min(t, timeout)
end
socket.iowait(self:pollfd(), self:events(), t)
return old_step(self, 0.0)
end
end)
#!/usr/bin/env tarantool
require "tarantool_cqueues"
local fiber = require "fiber"
local http_headers = require "http.headers"
local http_server = require "http.server"
local http_util = require "http.util"
local function reply(myserver, stream) -- luacheck: ignore 212
-- Read in headers
local req_headers = assert(stream:get_headers())
local req_method = req_headers:get ":method"
-- Log request to stdout
assert(io.stdout:write(string.format('[%s] "%s %s HTTP/%g" "%s" "%s"\n',
os.date("%d/%b/%Y:%H:%M:%S %z"),
req_method or "",
req_headers:get(":path") or "",
stream.connection.version,
req_headers:get("referer") or "-",
req_headers:get("user-agent") or "-"
)))
-- Build response headers
local res_headers = http_headers.new()
res_headers:append(":status", "200")
res_headers:append("content-type", "text/plain")
-- Send headers to client; end the stream immediately if this was a HEAD request
assert(stream:write_headers(res_headers, req_method == "HEAD"))
if req_method ~= "HEAD" then
-- Send body, ending the stream
assert(stream:write_chunk("Hello world!\n", true))
end
end
local myserver = assert(http_server.listen {
host = "127.0.0.1";
port = 8000;
onstream = reply;
onerror = function(self, context, op, err)
local msg = op .. " on " .. tostring(context) .. " failed"
if err then
msg = msg .. ": " .. tostring(err)
end
assert(io.stderr:write(msg, "\n"))
end;
})
-- Override :add_stream to call onstream in a new fiber (instead of new cqueues coroutine)
function myserver:add_stream(stream)
fiber.create(function()
fiber.yield() -- want to be called from main loop; not from :add_stream callee
local ok, err = http_util.yieldable_pcall(self.onstream, self, stream)
stream:shutdown()
if not ok then
self:onerror()(self, stream, "onstream", err)
end
end)
end
-- Run server in its own tarantool fibre
fiber.create(function()
assert(myserver:loop())
end)
-- Start another fibre that just prints+sleeps in a loop to show off non-blocking-ness of http server
fiber.create(function()
for i=1, 100 do
print("HELLO ", i)
fiber.sleep(0.1)
end
end)
#!/usr/bin/env tarantool
require "tarantool_cqueues"
local fiber = require "fiber"
package.loaded["http.client"] = nil -- tarantool has a namespace clash
local websocket = require "http.websocket"
fiber.create(function()
local ws = websocket.new_from_uri("wss://ws-feed.gdax.com")
assert(ws:connect())
assert(ws:send([[{"type": "subscribe", "product_id": "BTC-USD"}]]))
for _=1, 5 do
local data = assert(ws:receive())
print(data)
end
assert(ws:close())
end)
-- Start another fibre that just prints+sleeps in a loop to show off non-blocking-ness
fiber.create(function()
for i=1, 100 do
print("HELLO ", i)
fiber.sleep(0.1)
end
end)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment