Skip to content

Instantly share code, notes, and snippets.

@corny
Created October 3, 2016 11:28
Show Gist options
  • Save corny/5e3200f130365c97df930fc99cba76d4 to your computer and use it in GitHub Desktop.
Save corny/5e3200f130365c97df930fc99cba76d4 to your computer and use it in GitHub Desktop.
Customized Telegraf
package main
import (
"fmt"
"log"
"math"
"sync/atomic"
"time"
"github.com/influxdata/telegraf"
)
func NewAccumulator(
metrics chan telegraf.Metric,
) *accumulator {
acc := accumulator{}
acc.metrics = metrics
acc.precision = time.Second
return &acc
}
type accumulator struct {
metrics chan telegraf.Metric
defaultTags map[string]string
debug bool
// print every point added to the accumulator
trace bool
precision time.Duration
errCount uint64
}
func (ac *accumulator) AddFields(
measurement string,
fields map[string]interface{},
tags map[string]string,
t ...time.Time,
) {
if m := ac.makeMetric(measurement, fields, tags, telegraf.Untyped, t...); m != nil {
ac.metrics <- m
}
}
func (ac *accumulator) AddGauge(
measurement string,
fields map[string]interface{},
tags map[string]string,
t ...time.Time,
) {
if m := ac.makeMetric(measurement, fields, tags, telegraf.Gauge, t...); m != nil {
ac.metrics <- m
}
}
func (ac *accumulator) AddCounter(
measurement string,
fields map[string]interface{},
tags map[string]string,
t ...time.Time,
) {
if m := ac.makeMetric(measurement, fields, tags, telegraf.Counter, t...); m != nil {
ac.metrics <- m
}
}
// makeMetric either returns a metric, or returns nil if the metric doesn't
// need to be created (because of filtering, an error, etc.)
func (ac *accumulator) makeMetric(
measurement string,
fields map[string]interface{},
tags map[string]string,
mType telegraf.ValueType,
t ...time.Time,
) telegraf.Metric {
if len(fields) == 0 || len(measurement) == 0 {
return nil
}
if tags == nil {
tags = make(map[string]string)
}
// Apply daemon-wide tags if set
for k, v := range ac.defaultTags {
if _, ok := tags[k]; !ok {
tags[k] = v
}
}
for k, v := range fields {
// Validate uint64 and float64 fields
switch val := v.(type) {
case uint64:
// InfluxDB does not support writing uint64
if val < uint64(9223372036854775808) {
fields[k] = int64(val)
} else {
fields[k] = int64(9223372036854775807)
}
continue
case float64:
// NaNs are invalid values in influxdb, skip measurement
if math.IsNaN(val) || math.IsInf(val, 0) {
if ac.debug {
log.Printf("Measurement [%s] field [%s] has a NaN or Inf "+
"field, skipping",
measurement, k)
}
delete(fields, k)
continue
}
}
fields[k] = v
}
var timestamp time.Time
if len(t) > 0 {
timestamp = t[0]
} else {
timestamp = time.Now()
}
timestamp = timestamp.Round(ac.precision)
var m telegraf.Metric
var err error
switch mType {
case telegraf.Counter:
m, err = telegraf.NewCounterMetric(measurement, tags, fields, timestamp)
case telegraf.Gauge:
m, err = telegraf.NewGaugeMetric(measurement, tags, fields, timestamp)
default:
m, err = telegraf.NewMetric(measurement, tags, fields, timestamp)
}
if err != nil {
log.Printf("Error adding point [%s]: %s\n", measurement, err.Error())
return nil
}
if ac.trace {
fmt.Println("> " + m.String())
}
return m
}
// AddError passes a runtime error to the accumulator.
// The error will be tagged with the plugin name and written to the log.
func (ac *accumulator) AddError(err error) {
if err == nil {
return
}
atomic.AddUint64(&ac.errCount, 1)
//TODO suppress/throttle consecutive duplicate errors?
log.Printf("ERROR in input: %s", err)
}
func (ac *accumulator) Debug() bool {
return ac.debug
}
func (ac *accumulator) SetDebug(debug bool) {
ac.debug = debug
}
func (ac *accumulator) Trace() bool {
return ac.trace
}
func (ac *accumulator) SetTrace(trace bool) {
ac.trace = trace
}
func (ac *accumulator) SetPrecision(precision, interval time.Duration) {
panic("not implemented")
}
func (ac *accumulator) DisablePrecision() {
panic("not implemented")
}
func (ac *accumulator) setDefaultTags(tags map[string]string) {
ac.defaultTags = tags
}
func (ac *accumulator) addDefaultTag(key, value string) {
if ac.defaultTags == nil {
ac.defaultTags = make(map[string]string)
}
ac.defaultTags[key] = value
}
package main
import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/outputs/influxdb"
"os"
"time"
// import all required input plugins
"github.com/influxdata/telegraf/plugins/inputs"
_ "github.com/influxdata/telegraf/plugins/inputs/system"
)
func main() {
// channel shared between all input threads for accumulating metrics
metricC := make(chan telegraf.Metric, 10000)
acc := NewAccumulator(metricC)
// Set default tags
if hostname, err := os.Hostname(); err != nil {
acc.addDefaultTag("host", hostname)
}
// Configure output plugin
out := outputs.Outputs["influxdb"]
influx := out().(*influxdb.InfluxDB)
influx.URLs = []string{"http://localhost:8086"}
influx.Connect()
// Send gathered metrics continously to the output plugin
go func() {
for metric := range metricC {
influx.Write([]telegraf.Metric{metric})
}
}()
// Run the inputs continously
for range time.NewTicker(time.Minute).C {
in := inputs.Inputs["system"]
in().Gather(acc)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment