Last active
October 8, 2015 10:25
-
-
Save mattupstate/9f9c9d81830c2cc1eb55 to your computer and use it in GitHub Desktop.
heka disk usage decoder and filter
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--[[ | |
Graphs disk usage data containing fields entield `DiskSize`, `DiskUsed`, | |
`DiskAvailable`, and `DiskPercentUsed` | |
Config: | |
- sec_per_row (uint, optional, default 60) | |
Sets the size of each bucket (resolution in seconds) in the sliding window. | |
- rows (uint, optional, default 1440) | |
Sets the size of the sliding window i.e., 1440 rows representing 60 seconds | |
per row is a 24 sliding hour window with 1 minute resolution. | |
- anomaly_config (string, optional) | |
See :ref:`sandbox_anomaly_module`. | |
- preservation_version (uint, optional, default 0) | |
If `preserve_data = true` is set in the SandboxFilter configuration, then | |
this value should be incremented every time the `sec_per_row` or `rows` | |
configuration is changed to prevent the plugin from failing to start | |
during data restoration. | |
*Example Heka Configuration* | |
.. code-block:: ini | |
[DiskUsageFilter] | |
type = "SandboxFilter" | |
filename = "lua_filters/diskusage.lua" | |
ticker_interval = 60 | |
preserve_data = true | |
message_matcher = "Type == 'stats.diskusage'" | |
--]] | |
_PRESERVATION_VERSION = read_config("preservation_version") or 0 | |
require "circular_buffer" | |
require "string" | |
require "table" | |
local alert = require "alert" | |
local annotation = require "annotation" | |
local anomaly = require "anomaly" | |
local title = "Disk Usage Stats" | |
local rows = read_config("rows") or 1440 | |
local sec_per_row = read_config("sec_per_row") or 60 | |
local anomaly_config = anomaly.parse_config(read_config("anomaly_config")) | |
annotation.set_prune(title, rows * sec_per_row * 1e9) | |
local field_names = {"DiskSize", "DiskUsed", "DiskAvailable", "DiskPercentUsed"} | |
cbuf = circular_buffer.new(rows, #field_names, sec_per_row) | |
local labels = {} | |
for i, name in pairs(field_names) do | |
labels[i] = string.format("Fields[%s]", name) | |
cbuf:set_header(i, name, "Count", "max") | |
end | |
function process_message () | |
local ts = read_message("Timestamp") | |
for i, label in pairs(labels) do | |
local val = read_message(label) | |
if type(val) ~= "number" then return -1 end | |
cbuf:set(ts, i, val) | |
end | |
return 0 | |
end | |
function timer_event(ns) | |
if anomaly_config then | |
if not alert.throttled(ns) then | |
local msg, annos = anomaly.detect(ns, title, cbuf, anomaly_config) | |
if msg then | |
annotation.concat(title, annos) | |
alert.send(ns, msg) | |
end | |
end | |
inject_payload("cbuf", title, annotation.prune(title, ns), cbuf) | |
else | |
inject_payload("cbuf", title, cbuf) | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--[[ | |
Parses a payload containing the contents of a `df -kT | tail -n +2` call into a Heka | |
message. | |
Config: | |
- payload_keep (bool, optional, default false) | |
Always preserve the original log line in the message payload. | |
*Example Heka Configuration* | |
.. code-block:: ini | |
[ProcessInput] | |
ticker_interval = 5 | |
decoder = "DiskUsageDecoder" | |
[ProcessInput.command.0] | |
bin = "/bin/df" | |
args = ["-Tk"] | |
[ProcessInput.command.1] | |
bin = "/usr/bin/tail" | |
args = ["-n", "+2"] | |
[DiskUsageDecoder] | |
type = "SandboxDecoder" | |
filename = "lua_decoders/linux_diskusage.lua" | |
*Example Heka Message* | |
:Timestamp: 2014-01-10 07:04:56 -0800 PST | |
:Type: stats.diskusage | |
:Hostname: test.example.com | |
:Pid: 0 | |
:UUID: 8e414f01-9d7f-4a48-a5e1-ae92e5954df5 | |
:Payload: | |
:EnvVersion: | |
:Severity: 7 | |
:Fields: | |
| name:"DiskMount" type:string value:"/" | |
| name:"DiskSize" type:double value:8.115168e+06 | |
| name:"DiskUsed" type:double value:4.390164e+06 | |
| name:"DiskPercentUsed" type:double value:58 | |
| name:"DiskType" type:string value:"ext4" | |
| name:"DiskDevice" type:string value:"/dev/xvda1" | |
| name:"DiskAvailable" type:double value:3.289728e+06 | |
--]] | |
local l = require 'lpeg' | |
l.locale(l) | |
local num = l.digit^1 / tonumber | |
local nonspace = l.C((l.P(1)-l.space)^1) | |
local diskusage = l.Cg(nonspace, "DiskDevice") * | |
l.space^1 * l.Cg(nonspace, "DiskType") * | |
l.space^1 * l.Cg(num, "DiskSize") * | |
l.space^1 * l.Cg(num, "DiskUsed") * | |
l.space^1 * l.Cg(num, "DiskAvailable") * | |
l.space^1 * l.Cg(num, "DiskPercentUsed") * "%" * | |
l.space^1 * l.Cg(nonspace, "DiskMount") | |
local grammar = l.Ct(diskusage) | |
local payload_keep = read_config("payload_keep") | |
local msg = { | |
Type = "stats.diskusage", | |
Payload = nil, | |
Fields = nil | |
} | |
function process_message() | |
local data = read_message("Payload") | |
msg.Fields = grammar:match(data) | |
if not msg.Fields then | |
return -1 | |
end | |
if payload_keep then | |
msg.Payload = data | |
end | |
msg.Fields.FilePath = read_message("Fields[FilePath]") | |
inject_message(msg) | |
return 0 | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment