Skip to content

Instantly share code, notes, and snippets.

@cloudwu
Created July 5, 2019 09:41
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save cloudwu/9d296c929e792dd68f4d18071e7d6574 to your computer and use it in GitHub Desktop.
Save cloudwu/9d296c929e792dd68f4d18071e7d6574 to your computer and use it in GitHub Desktop.
Unity cache server
local skynet = require "skynet"
local socket = require "skynet.socket"
local db_path = assert(skynet.getenv "cache_db") .. "/"
local mode = (...) or "main"
local function tohex(c)
return string.format("%02x", c:byte())
end
local function id_filename(id)
local filename = id:gsub(".", tohex)
local path1 = filename:sub(1,3)
local path2 = filename:sub(4,6)
local path = path1 .. "/" .. path2 .. "/"
return db_path .. path .. filename, path1, path2
end
local server = {}
do -- server main
local worker = tonumber(skynet.getenv "worker" or skynet.getenv "thread")
local balance = 1
local worker_agent = {}
local function launch_worker()
for i=1,worker do
worker_agent[i] = skynet.newservice(SERVICE_NAME, "agent")
end
end
local function accept(fd, addr)
local agent = worker_agent[balance]
balance = balance + 1
if balance > worker then
balance = 1
end
skynet.send(agent, "lua", fd, addr, writer)
end
function server.main()
skynet.error("Cache server start")
if not skynet.getenv "daemon" then
local console = skynet.newservice("console")
end
skynet.newservice("debug_console", skynet.getenv "debug_port")
launch_worker()
local writer = skynet.newservice(SERVICE_NAME, "writer")
local address = skynet.getenv "server_addr" or "127.0.0.1"
local port = assert(skynet.getenv "server_port")
local id = assert(socket.listen(address, port))
skynet.error(string.format("Listen on %s:%s", address, port))
socket.start(id , accept)
end
end
do -- agent
local socket_error = {}
local function run (fd, writer)
local function readsocket(sz)
local v , msg = socket.read(fd, sz)
if not v then
skynet.error("REQUEST", sz, "READ", #msg)
error(socket_error)
else
return v
end
-- return assert(socket.read(fd, sz) , socket_error)
end
local function writesocket(data)
socket.write(fd, data)
end
-- hand shake
local version = readsocket(2)
if version == "fe" then
version = "000000fe"
else
version = version .. readsocket(6)
end
if version ~= "000000fe" then
skynet.error("Invalid version " .. version)
writesocket "00000000"
return
end
writesocket(version)
local action = {}
local function get(id, t)
skynet.error("GET " .. id .. t)
local fn = id_filename(id) .. "." .. t
local f = io.open(fn, "rb")
if not f then
writesocket("-"..t..id)
end
local size = f:seek "end"
if not size then
skynet.error("Can't seek " .. fn)
writesocket("-"..t..id)
f:close()
end
f:seek("set", 0)
local data = f:read "a"
f:close()
if not data then
skynet.error("Can't read " .. fn)
writesocket("-"..t..id)
end
writesocket(string.format("+%s%016x",t , size) .. id)
writesocket(data)
end
function action.ga(id)
return get(id, "a")
end
function action.gi(id)
return get(id, "i")
end
function action.gr(id)
return get(id, "r")
end
-- put id
function action.ts(id)
skynet.error("PUT " .. id)
local pa, pi, pr
while true do
local subaction = readsocket(2)
if subaction == "te" then
break
end
local t
local name = os.tmpname()
if subaction == "pa" then
t = "a"
pa = name
elseif subaction == "pi" then
t = "i"
pi = name
elseif subaction == "pr" then
t = "r"
pr = name
else
skynet.error("Invalid sub action " .. subaction)
error(socket_error)
end
local size = readsocket(16)
local sizen = tonumber(size, 16)
if sizen == nil then
skynet.error("Invalid size " .. size)
error(socket_error)
end
local data = readsocket(sizen)
local f = assert(io.open(name, "wb"))
f:write(data)
f:close()
end
skynet.call(writer, "lua", id, pa, pi, pr)
end
while true do
-- command : ga/gi/gr/ts + 32 bytes GUID/HASH
local command = readsocket(34)
local what = command:sub(1,2)
local f = action[what]
if f == nil then
skynet.error("Invalid command " .. what)
return
end
f(command:sub(3))
end
end
function server.agent()
skynet.dispatch("lua", function(session, address, fd, client_addr, writer)
skynet.error( client_addr .. " connected")
socket.start(fd)
local ok, msg = xpcall(run , debug.traceback, fd, writer)
socket.close(fd)
if not ok then
if msg == socket_error then
skynet.error( client_addr .. " socket closed" )
else
skynet.error( client_addr .. " : " .. msg)
end
end
end)
end
end
do -- writer
local path_cache = {}
local function move(filename, tmp, t)
local fn = filename .. t
assert(os.rename( tmp, fn))
skynet.error("Put ", fn)
end
local function write(id, pa, pi, pr)
local fn, path1, path2 = id_filename(id)
if not path_cache[path] then
-- todo : use lfs instead
if not path_cache[path1] then
os.execute( "mkdir " .. db_path .. path1 )
path_cache[path1] = true
end
os.execute( "mkdir " .. db_path .. path1 .. "/" .. path2 )
path_cache[path] = true
end
move(fn, pa, ".a")
move(fn, pi, ".i")
move(fn, pr, ".r")
end
function server.writer()
skynet.dispatch("lua", function(session, address, id, pa, pi, pr)
write(id, pa, pi, pr)
end)
end
end
skynet.start(server[mode])
-- daemon = "run/cacheserver.pid"
server_addr = "0.0.0.0"
server_port = 8126
cache_db = "db"
harbor = 0
thread = 8
logger = daemon and "run/cacheserver.log"
logpath = "run"
start = "cacheserver"
luaservice = "./?.lua;skynet/service/?.lua"
lualoader = "skynet/lualib/loader.lua"
lua_path = "skynet/lualib/?.lua"
lua_cpath = "skynet/luaclib/?.so"
cpath = "skynet/cservice/?.so"
debug_port = 8000
worker = thread
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment