Skip to content

Instantly share code, notes, and snippets.

@Deco
Created November 6, 2012 17:39
Show Gist options
  • Star 14 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save Deco/4026258 to your computer and use it in GitHub Desktop.
Save Deco/4026258 to your computer and use it in GitHub Desktop.
Multi-threaded LuaSocket with Lua Lanes example
--[[ socketlanes.lua
Multi-threaded LuaSocket with Lua Lanes example
===============================================
Depends on the following LuaSocket 2.0.2 patch:
http://www.net-core.org/dl/luasocket-2.0.2-acceptfd.patch
(via http://www.net-core.org/39/lua/patching-luasocket-to-make-it-compatible)
(provided at end of file)
Tested using Lua Lanes 3.4.0 (github, 2012-25-10)
Copyright (C) 2012 Declan White
Permission is hereby granted, free of charge, to any person obtaining a
copy of this software and associated documentation files (the "Software"),
to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense,
and/or sell copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
]]
--[[
debug.sethook(
function(ev, evdata)
local info = debug.getinfo(2, 'Sn')
if ev == 'line' then
print(tostring(info.source)..":"..(tonumber(evdata) or "?"))
end
end,
'l'
)
]]
package.path = "./lua/modules/?.lua"
package.cpath = "./lua/modules/?.dll"
function main(...)
local lanes = require'lanes'
--os.execute("pause")
if lanes.configure then lanes.configure() end -- depends on version
local io_file_write, io_file_flush = io.stdout.write, io.stdout.flush
local string_format = string.format
local printf_sync_linda = lanes.linda()
local printf_sync_lockfunc = lanes.genlock(printf_sync_linda, 'printf_sync_lock', 1)
local function printf_sync(fmt, ...)
local arglist = {...}
for arg_i = 1, select('#', ...) do
local arg = arglist[arg_i]
if type(arg) ~= 'string' and type(arg) ~= 'number' then
arglist[arg_i] = tostring(arg)
end
end
printf_sync_lockfunc(1)
io.stdout:flush()
--print(fmt, unpack(arglist))
io.stdout:write(string_format(fmt, unpack(arglist)))
io.stdout:write("\n")
io.stdout:flush()
io.stdout:flush()
printf_sync_lockfunc(-1)
end
local myservice = {
clientlist = {},
listenlinda = lanes.linda(),
}
function myservice.clienthandler(client_sock_fd)
-- Try to copy as little as possible to this lane.
-- That means don't access anything "outside" of this, except arguments.
local socket = require'socket'
-- Create a new socket object from the file descriptor
-- (fd = file descriptor; should be cross platform)
local client_sock = socket.tcp(client_sock_fd)
local client_peername = tostring(client_sock:getpeername())
printf_sync("ClientHandler %q(%s): Initialised!", client_peername, client_sock_fd)
local receivebuffer = {}
local receivechar, receiveerr, receiveatend = nil, nil, false
repeat
receivechar, receiveerr = client_sock:receive(1)
if receivechar ~= nil then
if receivechar == '\0' then
receiveatend = true
else
table.insert(receivebuffer, receivechar)
end
else
printf_sync("ClientHandler %q(%s): Receive failure (%q)", client_peername, client_sock_fd, receiveerr)
break
end
until receiveatend
printf_sync("ClientHandler %q(%s): Buffer = %q", client_peername, client_sock_fd, table.concat(receivebuffer))
client_sock:close()
printf_sync("ClientHandler %q(%s): Closed", client_peername, client_sock_fd)
end
myservice.clienthandler_lanegen = lanes.gen(jit and '*' or 'package,io,math,string,table,debug', myservice.clienthandler)
-- 'package' has `require`
myservice.listenhandler = function(listenlinda)
local socket = require'socket'
printf_sync("ListenHandler: Binding", newclient_peername)
local listensock = assert(socket.bind('*', 1234))
set_finalizer(function(err)
listensock:close()
end)
listensock:settimeout(0) -- non-blocking
printf_sync("ListenHandler: Bound", newclient_peername)
listenlinda:send(nil, 'listenhandler_ready', true)
while true do -- (should probably implement a way of exitting this cleaning)
local lindakey, lindaval = listenlinda:receive(nil, 'listenhandler_halt', 'listenhandler_poll')
if lindakey == 'listenhandler_poll' then
local newclient_sock_fd = listensock:acceptfd()
if newclient_sock_fd ~= nil then
-- Although it'd be nice, we can't construct a socket object for this new connection here.
-- If we did, the __gc metamethod would close it when this lane exists, meaning any
-- clienthandler lanes still running wouldn't be able to finish.
local newclient_sock --= socket.tcp(newclient_sock_fd)
local newclient_peername --= newclient_sock:getpeername()
printf_sync("ListenHandler: Client %q(%s) connected", tostring(newclient_peername), newclient_sock_fd)
listenlinda:send(nil, 'newclient_sock_fd', newclient_sock_fd)
printf_sync("ListenHandler: Client %q(%s) sent to linda", tostring(newclient_peername), newclient_sock_fd)
end
elseif lindakey == 'listenhandler_halt' then
-- You *could* make `listensock` blocking and just force-cancel the lane,
-- but that doesn't seem like good practice.
-- There may be a method to interrupt `accept`/`acceptfd` using a linda,
-- but I'm not aware of any easy way to do it.
break
end
end
return true
end
myservice.listenhandler_lanegen = lanes.gen(jit and '*' or 'package,string,math,io', myservice.listenhandler)
-- Testing!
print('yay')
local socket = require'socket'
print('yay')
local lanedebuglist = {}
-- Start server listening
printf_sync("Main: Launching ListenHandler lane")
myservice.listenhandler_lane = myservice.listenhandler_lanegen(myservice.listenlinda)
print('yay')
table.insert(lanedebuglist, myservice.listenhandler_lane)
printf_sync("Main: ListenHandler lane launched: %q", tostring(myservice.listenhandler_lane.status))
repeat
local lindakey, listenhandler_ready = myservice.listenlinda:receive(0, 'listenhandler_ready')
local status = myservice.listenhandler_lane.status
until listenhandler_ready
-- Start polling timer (check for new connection every half a second)
--lanes.timer(myservice.listenlinda, 'listenhandler_poll', 0, 0.5)
printf_sync("Main: ListenHandler lane ready")
-- Spawn some clients to send data.
local testcount = 5
local debug_hasproblem, debug_err = false, nil
do
for i = 1, testcount do
local sendersock, err = socket.connect('127.0.0.1', 1234)
if sendersock ~= nil then
sendersock:send(("Hello from client #%s!\0"):format(i))
sendersock:close()
printf_sync("Client #%s: data sent", i)
else
debug_hasproblem = true
debug_err = err
end
end
end
if not debug_hasproblem then
while testcount > 0 do
myservice.listenlinda:send('listenhandler_poll', socket.gettime())
local lindakey, newclient_sock_fd = myservice.listenlinda:receive(--[[nil]]0, 'newclient_sock_fd')
-- (Note: If you don't use Lua Lanes v3.4.0 or later, swap `lindakey` and `newclient_sock_fd` in the above line!)
if lindakey ~= nil and newclient_sock_fd ~= nil then
print('yep!')
printf_sync("Main: New client available")
local newclient = {
sock_fd = newclient_sock_fd,
}
newclient.lane = myservice.clienthandler_lanegen(newclient_sock_fd)
table.insert(lanedebuglist, newclient.lane)
printf_sync("Main: Client handler launched")
table.insert(myservice.clientlist, newclient)
testcount = testcount-1
else
print('nope')
end
for lane_i = 1, #lanedebuglist do
local lane = lanedebuglist[lane_i]
if lane.status == 'error' then
debug_hasproblem = true
end
end
end
printf_sync("Main: Canceling lanes")
repeat
myservice.listenlinda:send(nil, 'listenhandler_halt', true)
myservice.listenhandler_lane:join(5) -- wait a few seconds for it to close properly
until myservice.listenhandler_lane:cancel()
for client_i = 1, #myservice.clientlist do
local client = myservice.clientlist[client_i]
client.lane:cancel() -- Wait for clients
end
printf_sync("Main: Cancelled lanes")
end
-- testing!
if debug_hasproblem then -- error, proprogate it to main
printf_sync("problem! %q", tostring(debug_err))
for lane_i = 1, #lanedebuglist do
local lane = lanedebuglist[lane_i]
if lane.status == 'error' then
local lane_res_1 = lane[1]
end
end
end
printf_sync("Main: Done!")
return 0
end
return main(...)
--[[
--- luasocket-2.0.2/src/tcp.c 2007-10-15 06:21:05.000000000 +0200
+++ luasocket-2.0.2.new/src/tcp.c 2009-08-24 23:58:47.000000000 +0200
@@ -30,6 +30,7 @@
static int meth_shutdown(lua_State *L);
static int meth_receive(lua_State *L);
static int meth_accept(lua_State *L);
+static int meth_acceptfd(lua_State *L);
static int meth_close(lua_State *L);
static int meth_setoption(lua_State *L);
static int meth_settimeout(lua_State *L);
@@ -42,6 +43,7 @@
{"__gc", meth_close},
{"__tostring", auxiliar_tostring},
{"accept", meth_accept},
+ {"acceptfd", meth_acceptfd},
{"bind", meth_bind},
{"close", meth_close},
{"connect", meth_connect},
@@ -186,6 +188,27 @@
}
/*-------------------------------------------------------------------------*\
+* Waits for and returns a client object attempting connection to the
+* server object
+\*-------------------------------------------------------------------------*/
+static int meth_acceptfd(lua_State *L)
+{
+ p_tcp server = (p_tcp) auxiliar_checkclass(L, "tcp{server}", 1);
+ p_timeout tm = timeout_markstart(&server->tm);
+ t_socket sock;
+ int err = socket_accept(&server->sock, &sock, NULL, NULL, tm);
+ /* if successful, push client socket */
+ if (err == IO_DONE) {
+ lua_pushnumber(L, sock);
+ return 1;
+ } else {
+ lua_pushnil(L);
+ lua_pushstring(L, socket_strerror(err));
+ return 2;
+ }
+}
+
+/*-------------------------------------------------------------------------*\
* Binds an object to an address
\*-------------------------------------------------------------------------*/
static int meth_bind(lua_State *L)
@@ -316,12 +339,19 @@
static int global_create(lua_State *L)
{
t_socket sock;
- const char *err = inet_trycreate(&sock, SOCK_STREAM);
+ const char *err = NULL;
+ int fd = luaL_optnumber(L, 1, -1);
+ if (fd < 1)
+ err = inet_trycreate(&sock, SOCK_STREAM);
+ else
+ sock = fd;
/* try to allocate a system socket */
if (!err) {
/* allocate tcp object */
p_tcp tcp = (p_tcp) lua_newuserdata(L, sizeof(t_tcp));
- /* set its type as master object */
+ if (fd >= 1)
+ auxiliar_setclass(L, "tcp{client}", -1);
+ else
auxiliar_setclass(L, "tcp{master}", -1);
/* initialize remaining structure fields */
socket_setnonblocking(&sock);
]]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment