Created
June 18, 2015 14:56
-
-
Save Mons/0210b35f9ea4c242b60d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
if box.sandbox then | |
box.sandbox = nil | |
end | |
local ffi = require 'ffi' | |
ffi.cdef [[ | |
void exit(int) | |
]] | |
box.fiber.wrap(function() | |
local con = { | |
req = {} | |
} | |
function con.on_connect() | |
end | |
local on_connect_failed = function ( errno ) | |
end | |
local on_connect_reset = function ( errno ) | |
end | |
box.fiber.wrap(function() | |
local timeout = 1 | |
local state = 1 | |
local ainfo | |
local s | |
local rbuf | |
local maxbuf = 262144 | |
local _seq = 0 | |
local fid = box.fiber.id() | |
local internal = box.socket.internal; | |
local fd | |
print("internal: ",internal) | |
function con:ping() | |
--print("call ping ",self, self.req) | |
assert(box.fiber.id() ~= fid, "Requests prohibited from callbacks of connection. Use fiber.wrap") | |
_seq = _seq + 1 | |
local seq = _seq | |
if state == 3 then | |
local ch = box.ipc.channel(1) | |
self.req[seq] = ch | |
local buf = box.pack('iii', 65280, 0, seq) | |
-- print("request ",C2R[pktt] or pktt, "#", seq) | |
local now = box.time() | |
local wr = ffi.C.write(fd, buf, #buf) | |
if wr > 0 then | |
if wr == #buf then | |
--print("written completely") | |
else | |
print("written partially") | |
end | |
else | |
print("error: ",box.errno.strerror(box.errno())) | |
end | |
--con.wr:put(buf) | |
local body = ch:get( timeout ) | |
if body then | |
return true | |
elseif body == false then | |
return false | |
else | |
return nil | |
end | |
else | |
print("bad state ",state) | |
return false | |
end | |
end | |
-- 0 - reconnecting | |
-- 1 - resolving | |
-- 2 - connecting | |
-- 3 - connected | |
while true do | |
--print("state = ",state) | |
if state == 0 then -- reconnecting | |
if s then pcall(s.close,s) s = nil end | |
box.fiber.sleep(1) | |
end | |
if state == 1 then -- resolving | |
box.fiber.name("net.cw") | |
local ai = box.socket.getaddrinfo( '127.0.0.1', 33013, 1, { ['type'] = 'SOCK_STREAM' } ) | |
ainfo = ai[1] | |
s = box.socket( ainfo.family, ainfo.type, ainfo.protocol ) | |
-- TODO: checks | |
s:nonblock(true) | |
--s:linger(1,0) | |
state = 2 | |
end | |
if state == 2 then -- connecting | |
while true do | |
if s:sysconnect( ainfo.host, ainfo.port ) then | |
break | |
else | |
if s:errno() == box.errno.EINPROGRESS | |
or s:errno() == box.errno.EALREADY | |
or s:errno() == box.errno.EWOULDBLOCK | |
then | |
local wr = s:writable(1) | |
if wr then | |
state = 3 | |
fd = s.socket.fd | |
-- con.wr = box.ipc.channel(10) | |
con.on_connect() | |
break | |
else | |
on_connect_failed( box.errno.ETIMEDOUT ) | |
state = 0 | |
break | |
end | |
elseif s:errno() == box.errno.EINTR then | |
-- cont | |
else | |
state = 0 | |
on_connect_failed( box.errno.ETIMEDOUT ) | |
break | |
end | |
end | |
end | |
rbuf = '' | |
end | |
while state == 3 do | |
local rd = s:readable() | |
if rd then | |
local buf = s:sysread( maxbuf - #rbuf ) | |
--print("read ",#buf) | |
if buf then | |
rbuf = #rbuf > 0 and rbuf .. buf or buf | |
if #rbuf == 0 then | |
on_connect_reset(box.errno.ECONNABORTED) | |
state = 1 | |
return | |
end | |
local before = #rbuf | |
--self:on_read(#rbuf == 0) | |
local oft = 0 | |
while #rbuf >= oft + 12 do | |
local pktt,len,seq = box.unpack('iii',string.sub(rbuf, oft+1,oft+12 )) | |
--print("packet ",pktt," ",len," ",seq) | |
if #rbuf >= oft + 12 + len then | |
local body = string.sub( rbuf,oft + 12 + 1,oft + 12 + len ) | |
--print("body = '",body:xd(),"'") | |
if con.req[ seq ] then | |
if type(con.req[ seq ]) ~= 'number' then | |
--print("Have requestor for ",seq) | |
con.req[ seq ]:put( body ) | |
con.req[ seq ] = nil | |
else | |
print("Received timed out ",C2R[ pktt ], "#",seq," after ",string.format( "%0.4fs", box.time() - con.req[ seq ] )) | |
con.req[ seq ] = nil | |
end | |
else | |
print("Got no requestor for ",seq) | |
end | |
-- ... | |
oft = oft + 12 + len | |
else | |
break | |
end | |
end | |
if oft > 0 then | |
if oft < #rbuf then | |
rbuf = string.sub( rbuf,oft+1 ) | |
else | |
rbuf = '' | |
end | |
end | |
--- | |
if #rbuf == before and before == maxbuf then | |
rbuf = '' | |
on_connect_reset(box.errno.ENOMEM) | |
state = 1 | |
return | |
end | |
else | |
rbuf = '' | |
on_connect_reset(s:errno()) | |
return | |
end | |
else | |
on_connect_reset(s:errno()) | |
s:close() | |
state = 1 | |
end | |
end | |
end | |
end) | |
con.on_connect = function ()box.fiber.wrap(function () | |
print("connected") | |
print(con:ping()) | |
local N = 5000 | |
local clc = os.clock() | |
for i = 1,N do | |
con:ping() | |
end | |
local dlt = os.clock() - clc | |
print(string.format("Run for %d takes %0.2fs. %0.2fµs/call, %d rps", N, dlt, dlt*(1000000/N), N/dlt)) | |
print(con:ping()) | |
ffi.C.exit(0) | |
end)end | |
print("pass") | |
end) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment