Skip to content

Instantly share code, notes, and snippets.

@kn666
Forked from Mons/graphite.lua
Created May 4, 2018 07:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kn666/68df7c79d8f2b325c92da54e54041ea0 to your computer and use it in GitHub Desktop.
Save kn666/68df7c79d8f2b325c92da54e54041ea0 to your computer and use it in GitHub Desktop.
if gr == nil then
gr = { version = 1.0 };
else
gr.version = 1.0
end
-- gr['defschema'] = '1s30s 1s7d 1m3M'
gr['defschema'] = '1s30s'
gr['defagg'] = 'avg'
gr.port = 22003
function gr.mult (x)
if (x == 's') then return 1
elseif ( x == 'm' ) then return 60
elseif ( x == 'h' ) then return 3600
elseif ( x == 'd' ) then return 3600*24
elseif ( x == 'w' ) then return 3600*24*7
elseif ( x == 'M' ) then return 3600*24*30
elseif ( x == 'y' ) then return 3600*24*365
else error("Unknown type '" .. x .. "'")
end
end
function gr.st1t (x)
local a,b = string.match(x,'(%d+)(%a+)')
return tonumber(a) * gr.mult(b)
end
function gr.st2t (x)
local ret = {};
for d1,s1,d2,s2 in string.gmatch(x,'(%d+)(%a+):*(%d+)(%a+)%s*') do
local sk = d1..s1..d2..s2
d1 = d1 * gr.mult(s1)
d2 = math.floor(d2 * gr.mult(s2) / d1)
ret[#ret+1] = { d1,d2,sk };
end
return ret;
end
function gr.save(metric,value,time)
if (time == nil) then time = os.time() end
local m
while true do
m = box.select(0,1,metric);
if (m == nil) then
local v = box.space[0].index[0]:max()
local n
if v == nil then n = 1 else n = box.unpack('i',v[0]) + 1 end
print("create ",metric," with id ",n)
local r,e = pcall(box.insert, 0, { n, metric, gr['defschema'], gr['defagg'] })
if (not r) then
print("insert ",n,':',metric," failed: ",e)
break;
else
m = e
break;
end
else
break;
end
end
if m == nil then return end
local id = box.unpack('i',m[0])
value = tonumber(value)
-- print(m)
local schemas = gr.st2t(m[2]);
local agg = m[3];
for n,schema in pairs(schemas) do
local s1,s2,sk = unpack(schema)
local t = math.floor(time / s1);
local mintime = os.time() - s2*s1
local slot = t % s2;
-- print("save ",sk,' ',metric,' ',time, ' -> ',slot)
if (agg == 'last') then
local x = box.update(1,{m[0],slot},'=p',2,value)
if (not x) then
local r,e = pcall(box.insert,1, { m[0],slot,value })
if (not r) then
print("insert ",m[0]," failed: ",e)
x = box.update(1,{m[0],slot},'=p',2,value)
else
x = e
end
end
elseif (agg == 'avg') then
-- id,schema,slot,value,count,time
-- 0 1 2 3 4 5
local x = box.select(1,0,{m[0],sk,slot})
if (x == nil) then
print("insert")
box.insert(1,{ m[0],sk,slot,box.pack('i',value),box.pack('i',1), box.pack('i',time) })
elseif box.unpack('i',x[5]) < mintime or box.unpack('i',x[5]) ~= time then
-- print("replace ",box.unpack('i',x[5]), " -> ", time)
box.replace(1,{ m[0],sk,slot,box.pack('i',value),box.pack('i',1), box.pack('i',time) })
else
-- print("update ",box.unpack('i',x[4]), ' ',time, ' -> ', box.unpack('i',x[5]))
box.update(1, { x[0],sk,slot }, '=p+p', 3, box.pack('i',(box.unpack('i',x[3])+value)/(box.unpack('i',x[4])+1)), 4, 1 )
end
else
print("unsupported aggregation: ",agg)
end
end
return
end
function gr.points(metric,from,till)
if till == nil then till = os.time() end
local range = till - from
local m = box.select(0,1,metric);
if m == nil then return nil end
print("got range ",range," for ",m, ' from ',from, ' till ', till);
local schemas = gr.st2t(m[2]);
local schema
for n,sch in pairs(schemas) do
local s1,s2,sk = unpack(sch)
local prange = math.floor(range/s1)
print(sk, ' ',s1, ' ', s2, '; points by range ',prange,'; time back = ',s2*s1)
schema = sch
if ( os.time() - from < s2*s1 ) then
break;
else
print(sk, " is too small for ")
end
end
local s1,s2,sk = unpack(schema)
local minslot = math.floor(from / s1) % s2;
local maxslot = math.floor(till / s1) % s2;
local mintime = os.time() - s2*s1
print("use ",unpack(schema),' ',s1, ' ',s2, '; slots: ',minslot,',', maxslot );
local i = box.space[1].index[0]
local ret = {}
local json = '{now: ' .. os.time() .. ', points:[\n';
local tillend = nil;
if (minslot > maxslot) then
print("overlap ",minslot,' -> end, 0 -> ',maxslot)
--json = json .. "overlap "..minslot..' -> end, 0 -> '..maxslot
tillend = minslot
minslot = 0
else
print("linear ",minslot,' -> ',maxslot)
--json = json .. "linear " .. minslot .. ' -> ' .. maxslot
end
if (tillend) then
local x,v = i:next( m[0],sk,tillend )
if x ~= nil then
while true do
if v[1] ~= sk then break end;
if (box.unpack('i',v[5]) > mintime) then
ret[#ret+1] = v
json = json .. '['.. box.unpack('i',v[3]) .. ','.. box.unpack('i',v[5]) .. '],\n'
else
box.delete( 1,{v[0],v[1],v[2]} )
print("deprecated time ",box.unpack('i',v[5]), " for slot ",box.unpack('i',v[2])," must be ", math.floor(os.time() / s1 ) + box.unpack('i',v[2]) )
end
x,v = i:next(x)
if x == nil then break end;
end
end
end
x,v = i:next( m[0],sk,minslot )
if x ~= nil then
while true do
if box.unpack('i',v[2]) > maxslot then break end;
if (box.unpack('i',v[5]) > mintime) then
ret[#ret+1] = v
json = json .. '['.. box.unpack('i',v[3]) .. ','.. box.unpack('i',v[5]) .. '],\n'
else
box.delete( 1,{v[0],v[1],v[2]} )
print("deprecated time ",box.unpack('i',v[5]), " for slot ",box.unpack('i',v[2])," must be ", math.floor(os.time() / s1 ) + box.unpack('i',v[2]) )
end
x,v = i:next(x)
if x == nil then break end;
end
end
--json[#json] = ''
return json .. ']';
---return unpack(ret)
end
function gr.saveline(line)
local metric,value,time = string.match(line, '([^%s]+) ([%d.]+)%s*(%d*)')
if (metric ~= nil) then
value = tonumber(value)
if time ~= '' then
time = tonumber(time)
else
time = os.time()
end
-- print(metric,'; ',value, '; ',time)
gr.save(metric,value,time)
else
print("got malformed line ",line)
end
end
function gr.worker(sock,host,port)
box.fiber.detach()
box.fiber.name("gr.worker."..host..":"..port)
print("worker");
while true do
local line = sock:readline()
if (line == nil or line == '') then
print("closing connection");
sock:close()
break;
end
gr.saveline(line)
end
end
function gr.tcpserver()
box.fiber.detach()
box.fiber.name("gr.tcpserver")
while true do
local sock, status, host, port = gr.tcpsrv:accept()
if (sock ~= nil) then
print ("accepted ",host,":",port);
local wfiber = box.fiber.create(gr.worker)
box.fiber.resume(wfiber, sock,host,port)
else
print ("accept failed ", status, host,port);
end
end
end
function gr.udpserver()
box.fiber.detach()
box.fiber.name("gr.udpserver")
while true do
local msg, status, host, port = gr.udpsrv:recvfrom(1024)
if (msg ~= '') then
gr.saveline(msg)
else
print("not a message ", status, host, port);
end
end
end
if (gr.tcpsrv == nil) then
local sock = box.socket.tcp()
sock:bind('0.0.0.0', gr.port, 1);
sock:listen()
print("bound to tcp port ",gr.port)
local f = box.fiber.create(gr.tcpserver)
box.fiber.resume(f)
gr.tcpsrv = sock;
end
if (gr.udpsrv == nil) then
local sock = box.socket.udp()
sock:bind('0.0.0.0', gr.port, 1);
sock:listen()
print("bound to udp port ",gr.port)
local f = box.fiber.create(gr.udpserver)
box.fiber.resume(f)
gr.udpsrv = sock;
end
if httpd == nil then
httpd = {}
end
if httpd.port == nil then
httpd.port = 10888
end
CRLF = "\x0d\x0a"
function httpd.reply(sock,status,body,headers)
if body == nil then body = '' end
if headers == nil then headers = {} end
if headers['content-type'] == nil then headers['content-type'] = 'text/html' end
if headers['content-length'] == nil then headers['content-length'] = #body end
if headers['connection'] == nil then headers['connection'] = 'close' end
local reply = "HTTP/1.0 " .. tostring(status) .. CRLF
for k,v in pairs(headers) do
reply = reply .. k .. ': ' .. v .. CRLF
end
reply = reply .. CRLF .. body
print("send reply\n",reply)
sock:send(reply);
sock:close();
end
if (url == nil) then url = {} end
function url_decode(x)
if x == nil then return '' end
local v = string.gsub(x,'%+',' ')
v = string.gsub(v, '%%(%x%x)', function(s) return string.char( tonumber("0x"..s) ) end)
return v
end
function url.parse(url)
local qpos = string.find(url, "?");
local path
local query = {};
if qpos ~= nil then
path = string.sub(url,0,qpos-1)
local qv = string.sub(url,qpos+1)
print(path,';',qv)
for k,v in string.gmatch(qv,'([^&=]+)=([^&]*)') do
print(k,'=',v)
k = url_decode(k)
v = url_decode(v)
query[ k ] = v
end
else
path = url
end
return path,query
end
function httpd.worker(sock,host,port)
box.fiber.detach()
box.fiber.name("httpd.worker."..host..":"..port)
print("worker");
local line = sock:readline()
if (line == nil or line == '') then return sock:close() end
local meth, path, version = string.match(line, '^(%u+)%s([^%s]+)%sHTTP/([%d.]+)')
if (meth == nil) then return sock:close() end
local headers = {}
while true do
local line = sock:readline()
if (line == nil or line == '') then return sock:close() end
if (line == '\x0a' or line == '\x0d\x0a') then break end
local k,v = string.match(line,'^([^%s]+)%s*:%s*([^\x0d\x0a]*)\x0d?\x0a')
if (k == nil) then
print("not parsed ", line)
else
k = string.lower(k)
if headers[k] ~= nil then
headers[k] = headers[k] .. '; ' .. v
else
headers[k] = v
end
end
end
print(meth,' ',path,' ('..headers['host']..')');
local url, query = url.parse(path)
print("resulting path ",url)
--for k,v in pairs(query) do
-- print(k,'=',v)
--end
if (url == '/') then
if query.from == nil then
return httpd.reply(sock, 400, "from required");
end
local points = gr.points(query.target, os.time() - gr.st1t(query.from))
local hdr = {}
-- hdr['content-type'] = 'application/x-json'
hdr['content-type'] = 'text/plain'
return httpd.reply(sock, 200, points, hdr);
else
return httpd.reply(sock, 404, "Not found");
end
sock:close()
end
function httpd.server()
box.fiber.detach()
box.fiber.name("httpd.server")
while true do
local sock, status, host, port = httpd.sock:accept()
if (status == nil) then
print ("http accepted ",sock,'; ',host,":",port);
local wfiber = box.fiber.create(httpd.worker)
box.fiber.resume(wfiber, sock,host,port)
else
print ("http accept failed ", sock, status, host, port);
end
end
end
if httpd.sock == nil then
local sock = box.socket.tcp()
local s,status = sock:bind('0.0.0.0', httpd.port, 1);
if status ~= nil then error(tostring(sock) .. ' ' .. s .. ' listen failed: ' .. status) end
s,status = sock:listen()
if status ~= nil then error(tostring(sock) .. ' ' .. s .. ' listen failed: ' .. status) end
print("bound to http port ",httpd.port)
httpd.sock = sock;
local f = box.fiber.create(httpd.server)
box.fiber.resume(f)
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment