Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
heka disk usage decoder and filter
--[[
Graphs disk usage data containing fields entield `DiskSize`, `DiskUsed`,
`DiskAvailable`, and `DiskPercentUsed`
Config:
- sec_per_row (uint, optional, default 60)
Sets the size of each bucket (resolution in seconds) in the sliding window.
- rows (uint, optional, default 1440)
Sets the size of the sliding window i.e., 1440 rows representing 60 seconds
per row is a 24 sliding hour window with 1 minute resolution.
- anomaly_config (string, optional)
See :ref:`sandbox_anomaly_module`.
- preservation_version (uint, optional, default 0)
If `preserve_data = true` is set in the SandboxFilter configuration, then
this value should be incremented every time the `sec_per_row` or `rows`
configuration is changed to prevent the plugin from failing to start
during data restoration.
*Example Heka Configuration*
.. code-block:: ini
[DiskUsageFilter]
type = "SandboxFilter"
filename = "lua_filters/diskusage.lua"
ticker_interval = 60
preserve_data = true
message_matcher = "Type == 'stats.diskusage'"
--]]
_PRESERVATION_VERSION = read_config("preservation_version") or 0
require "circular_buffer"
require "string"
require "table"
local alert = require "alert"
local annotation = require "annotation"
local anomaly = require "anomaly"
local title = "Disk Usage Stats"
local rows = read_config("rows") or 1440
local sec_per_row = read_config("sec_per_row") or 60
local anomaly_config = anomaly.parse_config(read_config("anomaly_config"))
annotation.set_prune(title, rows * sec_per_row * 1e9)
local field_names = {"DiskSize", "DiskUsed", "DiskAvailable", "DiskPercentUsed"}
cbuf = circular_buffer.new(rows, #field_names, sec_per_row)
local labels = {}
for i, name in pairs(field_names) do
labels[i] = string.format("Fields[%s]", name)
cbuf:set_header(i, name, "Count", "max")
end
function process_message ()
local ts = read_message("Timestamp")
for i, label in pairs(labels) do
local val = read_message(label)
if type(val) ~= "number" then return -1 end
cbuf:set(ts, i, val)
end
return 0
end
function timer_event(ns)
if anomaly_config then
if not alert.throttled(ns) then
local msg, annos = anomaly.detect(ns, title, cbuf, anomaly_config)
if msg then
annotation.concat(title, annos)
alert.send(ns, msg)
end
end
inject_payload("cbuf", title, annotation.prune(title, ns), cbuf)
else
inject_payload("cbuf", title, cbuf)
end
end
--[[
Parses a payload containing the contents of a `df -kT | tail -n +2` call into a Heka
message.
Config:
- payload_keep (bool, optional, default false)
Always preserve the original log line in the message payload.
*Example Heka Configuration*
.. code-block:: ini
[ProcessInput]
ticker_interval = 5
decoder = "DiskUsageDecoder"
[ProcessInput.command.0]
bin = "/bin/df"
args = ["-Tk"]
[ProcessInput.command.1]
bin = "/usr/bin/tail"
args = ["-n", "+2"]
[DiskUsageDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/linux_diskusage.lua"
*Example Heka Message*
:Timestamp: 2014-01-10 07:04:56 -0800 PST
:Type: stats.diskusage
:Hostname: test.example.com
:Pid: 0
:UUID: 8e414f01-9d7f-4a48-a5e1-ae92e5954df5
:Payload:
:EnvVersion:
:Severity: 7
:Fields:
| name:"DiskMount" type:string value:"/"
| name:"DiskSize" type:double value:8.115168e+06
| name:"DiskUsed" type:double value:4.390164e+06
| name:"DiskPercentUsed" type:double value:58
| name:"DiskType" type:string value:"ext4"
| name:"DiskDevice" type:string value:"/dev/xvda1"
| name:"DiskAvailable" type:double value:3.289728e+06
--]]
local l = require 'lpeg'
l.locale(l)
local num = l.digit^1 / tonumber
local nonspace = l.C((l.P(1)-l.space)^1)
local diskusage = l.Cg(nonspace, "DiskDevice") *
l.space^1 * l.Cg(nonspace, "DiskType") *
l.space^1 * l.Cg(num, "DiskSize") *
l.space^1 * l.Cg(num, "DiskUsed") *
l.space^1 * l.Cg(num, "DiskAvailable") *
l.space^1 * l.Cg(num, "DiskPercentUsed") * "%" *
l.space^1 * l.Cg(nonspace, "DiskMount")
local grammar = l.Ct(diskusage)
local payload_keep = read_config("payload_keep")
local msg = {
Type = "stats.diskusage",
Payload = nil,
Fields = nil
}
function process_message()
local data = read_message("Payload")
msg.Fields = grammar:match(data)
if not msg.Fields then
return -1
end
if payload_keep then
msg.Payload = data
end
msg.Fields.FilePath = read_message("Fields[FilePath]")
inject_message(msg)
return 0
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.