Skip to content

Instantly share code, notes, and snippets.

@asenchi
Forked from chancez/cpustats.lua
Created August 4, 2014 15:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save asenchi/e6268a83bf8007c9ec94 to your computer and use it in GitHub Desktop.
Save asenchi/e6268a83bf8007c9ec94 to your computer and use it in GitHub Desktop.
-- This Source Code Form is subject to the terms of the Mozilla Public
-- License, v. 2.0. If a copy of the MPL was not distributed with this
-- file, You can obtain one at http://mozilla.org/MPL/2.0/.
--[[
Graphs the Cpu Load and number of processes of the system running heka.
Config:
- sec_per_row (uint, optional, default 60)
Sets the size of each bucket (resolution in seconds) in the sliding window.
- rows (uint, optional, default 1440)
Sets the size of the sliding window i.e., 1440 rows representing 60 seconds
per row is a 24 sliding hour window with 1 minute resolution.
- anomaly_config(string) - (see :ref:`sandbox_anomaly_module`)
*Example Heka Configuration*
.. code-block:: ini
[CpuStatsFilter]
type = "SandboxFilter"
filename = "lua_filters/cpustats.lua"
ticker_interval = 60
preserve_data = true
message_matcher = "Type == 'stats.cpustats'"
--]]
require "circular_buffer"
require "string"
local alert = require "alert"
local annotation = require "annotation"
local anomaly = require "anomaly"
local title = "Cpu Stats"
local rows = read_config("rows") or 1440
local sec_per_row = read_config("sec_per_row") or 60
local anomaly_config = anomaly.parse_config(read_config("anomaly_config"))
annotation.set_prune(title, rows * sec_per_row * 1e9)
local field_names = {"1MinAvg", "5MinAvg", "15MinAvg", "NumProcesses"}
cbuf = circular_buffer.new(rows, #field_names, sec_per_row)
local labels = {}
for i, name in ipairs(field_names) do
labels[#labels+1] = string.format("Fields[%s]", name)
cbuf:set_header(i, name, "Count", "max")
end
function process_message ()
local ts = read_message("Timestamp")
for i, name in ipairs(field_names) do
cbuf:set(ts, i, read_message(labels[i]))
end
return 0
end
function timer_event(ns)
if anomaly_config then
if not alert.throttled(ns) then
local msg, annos = anomaly.detect(ns, title, cbuf, anomaly_config)
if msg then
annotation.concat(title, annos)
alert.send(ns, msg)
end
end
inject_payload("cbuf", title, annotation.prune(title, ns), cbuf)
else
inject_payload("cbuf", title, cbuf)
end
end
package plugins
import (
"fmt"
"github.com/mozilla-services/heka/message"
"github.com/mozilla-services/heka/pipeline"
ts "github.com/mozilla-services/heka/pipeline/testsupport"
pm "github.com/mozilla-services/heka/pipelinemock"
"github.com/mozilla-services/heka/sandbox"
"github.com/rafrombrc/gomock/gomock"
gs "github.com/rafrombrc/gospec/src/gospec"
"os"
"path/filepath"
"time"
)
type FilterTestHelper struct {
MockHelper *pm.MockPluginHelper
MockFilterRunner *pm.MockFilterRunner
}
func NewFilterTestHelper(ctrl *gomock.Controller) (fth *FilterTestHelper) {
fth = new(FilterTestHelper)
fth.MockHelper = pm.NewMockPluginHelper(ctrl)
fth.MockFilterRunner = pm.NewMockFilterRunner(ctrl)
return
}
func FilterSpec(c gs.Context) {
t := new(ts.SimpleT)
ctrl := gomock.NewController(t)
defer ctrl.Finish()
fth := NewFilterTestHelper(ctrl)
inChan := make(chan *pipeline.PipelinePack, 1)
pConfig := pipeline.NewPipelineConfig(nil)
c.Specify("A SandboxFilter", func() {
sbFilter := new(SandboxFilter)
sbFilter.SetPipelineConfig(pConfig)
config := sbFilter.ConfigStruct().(*sandbox.SandboxConfig)
config.MemoryLimit = 32000
config.InstructionLimit = 1000
config.OutputLimit = 1024
msg := getTestMessage()
pack := pipeline.NewPipelinePack(pConfig.InjectRecycleChan())
pack.Message = msg
pack.Decoded = true
c.Specify("Uninitialized", func() {
err := sbFilter.ReportMsg(msg)
c.Expect(err, gs.IsNil)
})
c.Specify("Over inject messages from ProcessMessage", func() {
var timer <-chan time.Time
fth.MockFilterRunner.EXPECT().Ticker().Return(timer)
fth.MockFilterRunner.EXPECT().InChan().Return(inChan)
fth.MockFilterRunner.EXPECT().Name().Return("processinject").Times(3)
fth.MockFilterRunner.EXPECT().Inject(pack).Return(true).Times(2)
fth.MockHelper.EXPECT().PipelineConfig().Return(pConfig)
fth.MockHelper.EXPECT().PipelinePack(uint(0)).Return(pack).Times(2)
fth.MockFilterRunner.EXPECT().LogError(fmt.Errorf("exceeded InjectMessage count"))
config.ScriptFilename = "../lua/testsupport/processinject.lua"
err := sbFilter.Init(config)
c.Assume(err, gs.IsNil)
inChan <- pack
close(inChan)
sbFilter.Run(fth.MockFilterRunner, fth.MockHelper)
})
c.Specify("Over inject messages from TimerEvent", func() {
var timer <-chan time.Time
timer = time.Tick(time.Duration(1) * time.Millisecond)
fth.MockFilterRunner.EXPECT().Ticker().Return(timer)
fth.MockFilterRunner.EXPECT().InChan().Return(inChan)
fth.MockFilterRunner.EXPECT().Name().Return("timerinject").Times(12)
fth.MockFilterRunner.EXPECT().Inject(pack).Return(true).Times(11)
fth.MockHelper.EXPECT().PipelineConfig().Return(pConfig)
fth.MockHelper.EXPECT().PipelinePack(uint(0)).Return(pack).Times(11)
fth.MockFilterRunner.EXPECT().LogError(fmt.Errorf("exceeded InjectMessage count"))
config.ScriptFilename = "../lua/testsupport/timerinject.lua"
err := sbFilter.Init(config)
c.Assume(err, gs.IsNil)
go func() {
time.Sleep(time.Duration(250) * time.Millisecond)
close(inChan)
}()
sbFilter.Run(fth.MockFilterRunner, fth.MockHelper)
})
c.Specify("Preserves data", func() {
var timer <-chan time.Time
fth.MockFilterRunner.EXPECT().Ticker().Return(timer)
fth.MockFilterRunner.EXPECT().InChan().Return(inChan)
config.ScriptFilename = "../lua/testsupport/serialize.lua"
config.PreserveData = true
sbFilter.SetName("serialize")
err := sbFilter.Init(config)
c.Assume(err, gs.IsNil)
close(inChan)
sbFilter.Run(fth.MockFilterRunner, fth.MockHelper)
_, err = os.Stat("sandbox_preservation/serialize.data")
c.Expect(err, gs.IsNil)
err = os.Remove("sandbox_preservation/serialize.data")
c.Expect(err, gs.IsNil)
})
})
c.Specify("A SandboxManagerFilter", func() {
pConfig.Globals.BaseDir = os.TempDir()
sbxMgrsDir := filepath.Join(pConfig.Globals.BaseDir, "sbxmgrs")
defer func() {
tmpErr := os.RemoveAll(sbxMgrsDir)
c.Expect(tmpErr, gs.IsNil)
}()
sbmFilter := new(SandboxManagerFilter)
sbmFilter.SetPipelineConfig(pConfig)
config := sbmFilter.ConfigStruct().(*SandboxManagerFilterConfig)
config.MaxFilters = 1
msg := getTestMessage()
pack := pipeline.NewPipelinePack(pConfig.InputRecycleChan())
pack.Message = msg
pack.Decoded = true
c.Specify("Control message in the past", func() {
sbmFilter.Init(config)
pack.Message.SetTimestamp(time.Now().UnixNano() - 5e9)
fth.MockFilterRunner.EXPECT().InChan().Return(inChan)
fth.MockFilterRunner.EXPECT().Name().Return("SandboxManagerFilter")
fth.MockFilterRunner.EXPECT().LogError(fmt.Errorf("Discarded control message: 5 seconds skew"))
inChan <- pack
close(inChan)
sbmFilter.Run(fth.MockFilterRunner, fth.MockHelper)
})
c.Specify("Control message in the future", func() {
sbmFilter.Init(config)
pack.Message.SetTimestamp(time.Now().UnixNano() + 5.9e9)
fth.MockFilterRunner.EXPECT().InChan().Return(inChan)
fth.MockFilterRunner.EXPECT().Name().Return("SandboxManagerFilter")
fth.MockFilterRunner.EXPECT().LogError(fmt.Errorf("Discarded control message: -5 seconds skew"))
inChan <- pack
close(inChan)
sbmFilter.Run(fth.MockFilterRunner, fth.MockHelper)
})
c.Specify("Generates the right default working directory", func() {
sbmFilter.Init(config)
fth.MockFilterRunner.EXPECT().InChan().Return(inChan)
name := "SandboxManagerFilter"
fth.MockFilterRunner.EXPECT().Name().Return(name)
close(inChan)
sbmFilter.Run(fth.MockFilterRunner, fth.MockHelper)
c.Expect(sbmFilter.workingDirectory, gs.Equals, sbxMgrsDir)
_, err := os.Stat(sbxMgrsDir)
c.Expect(err, gs.IsNil)
})
c.Specify("Sanity check the default sandbox configuration limits", func() {
sbmFilter.Init(config)
c.Expect(sbmFilter.memoryLimit, gs.Equals, uint(8*1024*1024))
c.Expect(sbmFilter.instructionLimit, gs.Equals, uint(1e6))
c.Expect(sbmFilter.outputLimit, gs.Equals, uint(63*1024))
})
c.Specify("Sanity check the user specified sandbox configuration limits", func() {
config.MemoryLimit = 123456
config.InstructionLimit = 4321
config.OutputLimit = 8765
sbmFilter.Init(config)
c.Expect(sbmFilter.memoryLimit, gs.Equals, config.MemoryLimit)
c.Expect(sbmFilter.instructionLimit, gs.Equals, config.InstructionLimit)
c.Expect(sbmFilter.outputLimit, gs.Equals, config.OutputLimit)
})
c.Specify("Creates a SandboxFilter runner", func() {
sbxName := "SandboxFilter"
sbxMgrName := "SandboxManagerFilter"
code := `
require("cjson")
function process_message()
inject_payload(cjson.encode({a = "b"}))
return 0
end
`
cfg := `
[%s]
type = "SandboxFilter"
message_matcher = "TRUE"
script_type = "lua"
`
cfg = fmt.Sprintf(cfg, sbxName)
msg.SetPayload(code)
f, err := message.NewField("config", cfg, "toml")
c.Assume(err, gs.IsNil)
msg.AddField(f)
fMatchChan := pConfig.Router().AddFilterMatcher()
errChan := make(chan error)
fth.MockFilterRunner.EXPECT().Name().Return(sbxMgrName)
fullSbxName := fmt.Sprintf("%s-%s", sbxMgrName, sbxName)
fth.MockHelper.EXPECT().Filter(fullSbxName).Return(nil, false)
fth.MockFilterRunner.EXPECT().LogMessage(fmt.Sprintf("Loading: %s", fullSbxName))
sbmFilter.Init(config)
go func() {
err := sbmFilter.loadSandbox(fth.MockFilterRunner, fth.MockHelper, sbxMgrsDir,
msg)
errChan <- err
}()
fMatch := <-fMatchChan
c.Expect(fMatch.MatcherSpecification().String(), gs.Equals, "TRUE")
c.Expect(<-errChan, gs.IsNil)
go func() {
<-pConfig.Router().RemoveFilterMatcher()
}()
ok := pConfig.RemoveFilterRunner(fullSbxName)
c.Expect(ok, gs.IsTrue)
})
})
c.Specify("A Cpu Stats filter", func() {
filter := new(SandboxFilter)
filter.SetPipelineConfig(pConfig)
filter.name = "cpustats"
conf := filter.ConfigStruct().(*sandbox.SandboxConfig)
conf.ScriptFilename = "../lua/filters/cpustats.lua"
conf.ModuleDirectory = "../lua/modules"
conf.MemoryLimit = 1000000
conf.Config = make(map[string]interface{})
conf.Config["rows"] = int64(3)
conf.Config["seconds_per_row"] = int64(1)
timer := make(chan time.Time, 1)
errChan := make(chan error, 1)
retPackChan := make(chan *pipeline.PipelinePack, 5)
msg := getTestMessage()
fields := make([]*message.Field, 4)
fields[0], _ = message.NewField("1MinAvg", "0.08", "")
fields[1], _ = message.NewField("5MinAvg", "0.04", "")
fields[2], _ = message.NewField("15MinAvg", "0.02", "")
fields[3], _ = message.NewField("NumProcesses", "5", "")
msg.Fields = fields
pack := pipeline.NewPipelinePack(pConfig.InjectRecycleChan())
pack.Message = msg
pack.Decoded = true
fth.MockHelper.EXPECT().PipelinePack(uint(0)).Return(pack).Times(3)
fth.MockFilterRunner.EXPECT().Ticker().Return(timer)
fth.MockFilterRunner.EXPECT().InChan().Return(inChan)
fth.MockFilterRunner.EXPECT().Name().Return("cpustats").Times(3)
fth.MockFilterRunner.EXPECT().Inject(pack).Do(func(pack *pipeline.PipelinePack) {
retPackChan <- pack
}).Return(true).Times(3)
err := filter.Init(conf)
c.Assume(err, gs.IsNil)
go func() {
errChan <- filter.Run(fth.MockFilterRunner, fth.MockHelper)
}()
var p *pipeline.PipelinePack
for i := 0; i < 3; i++ {
pack := pipeline.NewPipelinePack(pConfig.InjectRecycleChan())
pack.Message = msg
future := time.Second * time.Duration(i)
pack.Message.SetTimestamp(time.Now().Add(future).Unix())
pack.Decoded = true
inChan <- pack
// Feed in a pack
// Begin processing
timer <- time.Now()
p = <-retPackChan
}
// Check the result of the filter's inject
fmt.Println("pl:", p.Message.GetPayload())
close(inChan)
c.Expect(<-errChan, gs.IsNil)
close(errChan)
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment