Skip to content

Instantly share code, notes, and snippets.

@dannycoates
Last active January 14, 2016 11:29
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save dannycoates/4a48955ad889b4c6b898 to your computer and use it in GitHub Desktop.
Save dannycoates/4a48955ad889b4c6b898 to your computer and use it in GitHub Desktop.
heka anomalies and alerts

These are a couple real examples of heka lua filters that do anomaly detection and alerting, which a both very new experimental features. Eventually we should have some nicer abstractions for detecting and alerting common scenarios that can be easily configured instead of having to write all the logic yourself. There's currently a lot of flexabilty but at the price of more code.

Since heka filters can process and generate messages, you can make pretty complex meta-dataflows to control alerts, and can wire them up to any output (I think), irc, email, sms, whatevs.

The module sources are here:

https://github.com/mozilla-services/heka/tree/dev/sandbox/lua/modules

Our current dashboard displays annotations, but the UX still needs a lot of work; there's too much clicking and navigating required. I'd ❤︎ to have some time to contribute.

-- This Source Code Form is subject to the terms of the Mozilla Public
-- License, v. 2.0. If a copy of the MPL was not distributed with this
-- file, You can obtain one at http://mozilla.org/MPL/2.0/.
require "math"
require "string"
local alert = require "alert"
local message_variable = read_config("message_variable") or error("must specify a 'message_variable'")
local max_items = read_config("max_items") or 25000
local alert_min_count = read_config("alert_min_count") or 50
local alert_max_mean = read_config("alert_max_mean") or 1
local alert_throttle = read_config("alert_throttle") or 5 * 60 * 1e9
alert.set_throttle(alert_throttle)
local WEIGHT, TS, N, OM, NM, OS, NS, ALERT = 1, 2, 3, 4, 5, 6, 7, 8
local function running_stats(x, y)
y[N] = y[N] + 1
if y[N] == 1 then
y[OM], y[NM] = x, x
y[OS] = 0
else
y[NM] = y[OM] + (x - y[OM])/y[N]
y[NS] = y[OS] + (x - y[OM])*(x - y[NM])
y[OM] = y[NM]
y[OS] = y[NS]
end
end
items = {}
items_size = 0
active_day = 0
function process_message ()
local ts = read_message("Timestamp")
local item = read_message(message_variable)
if not item then return -1 end
local day = math.floor(ts / (60 * 60 * 24 * 1e9))
if day < active_day then
return 0 -- too old
elseif day > active_day then
active_day = day
items = {}
items_size = 0
end
local i = items[item]
if i then
if i[TS] ~= 0 then
if not i[N] then i[N] = 0 end
local x = ts - i[TS]
running_stats(x/1e9, i)
end
i[TS] = ts
i[WEIGHT] = i[WEIGHT] + 1
return 0
end
if items_size == max_items then
for k,v in pairs(items) do
local weight = v[WEIGHT]
if weight == 1 then
items[k] = nil
items_size = items_size - 1
else
v[WEIGHT] = weight - 1
end
end
else
i = {1, ts}
items[item] = i
items_size = items_size + 1
end
return 0
end
function timer_event(ns)
output(string.format("%s\tWeight\tCount\tMean\tSD\n", message_variable))
for k, v in pairs(items) do
if v[N] and v[N] >= alert_min_count then
local variance = v[NS]/(v[N]-1)
output(string.format("%s\t%d\t%d\t%G\t%G\n", k, v[WEIGHT], v[N], v[NM], math.sqrt(variance)))
if v[NM] <= alert_max_mean and not v[ALERT] then
v[ALERT] = alert.queue(ns, string.format("Abuse detected %s: %s", message_variable, k))
end
end
end
inject_message("tsv", "Statistics")
alert.send_queue(ns)
end
-- This Source Code Form is subject to the terms of the Mozilla Public
-- License, v. 2.0. If a copy of the MPL was not distributed with this
-- file, You can obtain one at http://mozilla.org/MPL/2.0/.
require "circular_buffer"
require "string"
local alert = require "alert"
local annotation = require "annotation"
local anomaly = require "anomaly"
local title = "Summary"
local rows = read_config("rows") or 1440
local sec_per_row = read_config("sec_per_row") or 60
local anomaly_config = anomaly.parse_config(read_config("anomaly_config"))
annotation.set_prune(title, rows * sec_per_row * 1e9)
data = circular_buffer.new(rows, 3, sec_per_row)
local SUCCESS = data:set_header(1, "Success")
local FAILURE = data:set_header(2, "Failure")
local PFAIL = data:set_header(3, "%Failure", "percent", "none")
function process_message ()
local ts = read_message("Timestamp")
local errno = read_message("Fields[errno]")
if errno == 0 then
local s = data:add(ts, SUCCESS, 1)
if not s then return 0 end
local f = data:get(ts, FAILURE)
if f and f == f then
local p = f / (f + s) * 100
data:set(ts, PFAIL, p)
else
data:set(ts, PFAIL, 0)
end
else
local f = data:add(ts, FAILURE, 1)
if not f then return 0 end
local s = data:get(ts, SUCCESS)
if s and s == s then
local p = f / (f + s) * 100
data:set(ts, PFAIL, p)
else
data:set(ts, PFAIL, 100)
end
end
return 0
end
function timer_event(ns)
if anomaly_config then
if not alert.throttled(ns) then
local msg, annos = anomaly.detect(ns, title, data, anomaly_config)
if msg then
annotation.concat(title, annos)
alert.send(ns, msg)
end
end
output({annotations = annotation.prune(title, ns)}, data)
inject_message("cbuf", title)
else
inject_message(data:format("cbuf"), title)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment