Skip to content

Instantly share code, notes, and snippets.

@mattupstate
Last active August 29, 2015 14:09
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mattupstate/05971460dd937c338da0 to your computer and use it in GitHub Desktop.
Save mattupstate/05971460dd937c338da0 to your computer and use it in GitHub Desktop.
heka Nginx status decoder and filter
--[[
Parses a payload containing the contents of the output from the Nginx `stub_status`
module. See: http://nginx.org/en/docs/http/ngx_http_stub_status_module.html
Config:
- payload_keep (bool, optional, default false)
Always preserve the original log line in the message payload.
*Example Heka Configuration*
.. code-block:: ini
[HttpInput]
url = "http://127.0.0.1:8000/_status"
ticker_interval = 5
decoder = "NginxStatusDecoder"
[NginxStatusDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/nginx_status.lua"
*Example Heka Message*
:Timestamp: 2014-11-11 22:13:52 +0000 UTC
:Type: stats.nginx
:Hostname: ip-10-80-155-196
:Pid: 0
:Uuid: 9a129dd8-98dc-4e98-a60f-b27e01c4d653
:Logger: http://127.0.0.1:8000/_status
:Payload:
:EnvVersion:
:Severity: 6
:Fields:
| name:"Requests" type:double value:237
| name:"Waiting" type:double value:0
| name:"Active" type:double value:1
| name:"Reading" type:double value:0
| name:"Writing" type:double value:1
| name:"Handled" type:double value:203
| name:"Accepted" type:double value:203
--]]
local l = require 'lpeg'
l.locale(l)
local l = require 'lpeg'
l.locale(l)
num = l.digit^1 / tonumber
nonspace = l.C((l.P(1)-l.space)^1)
nginxstatus = l.P('Active connections: ') *
l.Cg(num, 'Active') * l.space^1 *
l.P("server accepts handled requests") * l.space^1 *
l.Cg(num, 'Accepted') * l.space^1 *
l.Cg(num, 'Handled') * l.space^1 *
l.Cg(num, 'Requests') * l.space^1 *
l.P('Reading:') * l.space^1 *
l.Cg(num, 'Reading') * l.space^1 *
l.P('Writing:') * l.space^1 *
l.Cg(num, 'Writing') * l.space^1 *
l.P('Waiting:') * l.space^1 *
l.Cg(num, 'Waiting')
grammar = l.Ct(nginxstatus)
local payload_keep = read_config("payload_keep")
local msg = {
Type = "stats.nginx",
Payload = nil,
Fields = nil
}
function process_message()
local data = read_message("Payload")
msg.Fields = grammar:match(data)
if not msg.Fields then
return -1
end
if payload_keep then
msg.Payload = data
end
msg.Fields.FilePath = read_message("Fields[FilePath]")
inject_message(msg)
return 0
end
--[[
Graphs Nginx status data containing fields entield `Active`, `Accepted`,
`Handled`, `Requests`, `Reading`, `Writing`, and `Waiting`.
Config:
- sec_per_row (uint, optional, default 60)
Sets the size of each bucket (resolution in seconds) in the sliding window.
- rows (uint, optional, default 1440)
Sets the size of the sliding window i.e., 1440 rows representing 60 seconds
per row is a 24 sliding hour window with 1 minute resolution.
- anomaly_config (string, optional)
See :ref:`sandbox_anomaly_module`.
- preservation_version (uint, optional, default 0)
If `preserve_data = true` is set in the SandboxFilter configuration, then
this value should be incremented every time the `sec_per_row` or `rows`
configuration is changed to prevent the plugin from failing to start
during data restoration.
*Example Heka Configuration*
.. code-block:: ini
[NginxStatusFilter]
type = "SandboxFilter"
filename = "lua_filters/nginx_status.lua"
ticker_interval = 10
preserve_data = true
message_matcher = "Type == 'stats.nginx'"
--]]
_PRESERVATION_VERSION = read_config("preservation_version") or 0
require "circular_buffer"
require "string"
require "table"
local alert = require "alert"
local annotation = require "annotation"
local anomaly = require "anomaly"
local title = "Nginx Stats"
local rows = read_config("rows") or 1440
local sec_per_row = read_config("sec_per_row") or 60
local anomaly_config = anomaly.parse_config(read_config("anomaly_config"))
annotation.set_prune(title, rows * sec_per_row * 1e9)
local field_names = {"Active", "Accepted", "Handled", "Requests", "Reading", "Writing", "Waiting"}
cbuf = circular_buffer.new(rows, #field_names, sec_per_row)
local labels = {}
for i, name in pairs(field_names) do
labels[i] = string.format("Fields[%s]", name)
cbuf:set_header(i, name, "Count", "max")
end
function process_message ()
local ts = read_message("Timestamp")
for i, label in pairs(labels) do
local val = read_message(label)
if type(val) ~= "number" then return -1 end
cbuf:set(ts, i, val)
end
return 0
end
function timer_event(ns)
if anomaly_config then
if not alert.throttled(ns) then
local msg, annos = anomaly.detect(ns, title, cbuf, anomaly_config)
if msg then
annotation.concat(title, annos)
alert.send(ns, msg)
end
end
inject_payload("cbuf", title, annotation.prune(title, ns), cbuf)
else
inject_payload("cbuf", title, cbuf)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment