Created
October 3, 2016 11:28
-
-
Save corny/5e3200f130365c97df930fc99cba76d4 to your computer and use it in GitHub Desktop.
Customized Telegraf
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"log" | |
"math" | |
"sync/atomic" | |
"time" | |
"github.com/influxdata/telegraf" | |
) | |
func NewAccumulator( | |
metrics chan telegraf.Metric, | |
) *accumulator { | |
acc := accumulator{} | |
acc.metrics = metrics | |
acc.precision = time.Second | |
return &acc | |
} | |
type accumulator struct { | |
metrics chan telegraf.Metric | |
defaultTags map[string]string | |
debug bool | |
// print every point added to the accumulator | |
trace bool | |
precision time.Duration | |
errCount uint64 | |
} | |
func (ac *accumulator) AddFields( | |
measurement string, | |
fields map[string]interface{}, | |
tags map[string]string, | |
t ...time.Time, | |
) { | |
if m := ac.makeMetric(measurement, fields, tags, telegraf.Untyped, t...); m != nil { | |
ac.metrics <- m | |
} | |
} | |
func (ac *accumulator) AddGauge( | |
measurement string, | |
fields map[string]interface{}, | |
tags map[string]string, | |
t ...time.Time, | |
) { | |
if m := ac.makeMetric(measurement, fields, tags, telegraf.Gauge, t...); m != nil { | |
ac.metrics <- m | |
} | |
} | |
func (ac *accumulator) AddCounter( | |
measurement string, | |
fields map[string]interface{}, | |
tags map[string]string, | |
t ...time.Time, | |
) { | |
if m := ac.makeMetric(measurement, fields, tags, telegraf.Counter, t...); m != nil { | |
ac.metrics <- m | |
} | |
} | |
// makeMetric either returns a metric, or returns nil if the metric doesn't | |
// need to be created (because of filtering, an error, etc.) | |
func (ac *accumulator) makeMetric( | |
measurement string, | |
fields map[string]interface{}, | |
tags map[string]string, | |
mType telegraf.ValueType, | |
t ...time.Time, | |
) telegraf.Metric { | |
if len(fields) == 0 || len(measurement) == 0 { | |
return nil | |
} | |
if tags == nil { | |
tags = make(map[string]string) | |
} | |
// Apply daemon-wide tags if set | |
for k, v := range ac.defaultTags { | |
if _, ok := tags[k]; !ok { | |
tags[k] = v | |
} | |
} | |
for k, v := range fields { | |
// Validate uint64 and float64 fields | |
switch val := v.(type) { | |
case uint64: | |
// InfluxDB does not support writing uint64 | |
if val < uint64(9223372036854775808) { | |
fields[k] = int64(val) | |
} else { | |
fields[k] = int64(9223372036854775807) | |
} | |
continue | |
case float64: | |
// NaNs are invalid values in influxdb, skip measurement | |
if math.IsNaN(val) || math.IsInf(val, 0) { | |
if ac.debug { | |
log.Printf("Measurement [%s] field [%s] has a NaN or Inf "+ | |
"field, skipping", | |
measurement, k) | |
} | |
delete(fields, k) | |
continue | |
} | |
} | |
fields[k] = v | |
} | |
var timestamp time.Time | |
if len(t) > 0 { | |
timestamp = t[0] | |
} else { | |
timestamp = time.Now() | |
} | |
timestamp = timestamp.Round(ac.precision) | |
var m telegraf.Metric | |
var err error | |
switch mType { | |
case telegraf.Counter: | |
m, err = telegraf.NewCounterMetric(measurement, tags, fields, timestamp) | |
case telegraf.Gauge: | |
m, err = telegraf.NewGaugeMetric(measurement, tags, fields, timestamp) | |
default: | |
m, err = telegraf.NewMetric(measurement, tags, fields, timestamp) | |
} | |
if err != nil { | |
log.Printf("Error adding point [%s]: %s\n", measurement, err.Error()) | |
return nil | |
} | |
if ac.trace { | |
fmt.Println("> " + m.String()) | |
} | |
return m | |
} | |
// AddError passes a runtime error to the accumulator. | |
// The error will be tagged with the plugin name and written to the log. | |
func (ac *accumulator) AddError(err error) { | |
if err == nil { | |
return | |
} | |
atomic.AddUint64(&ac.errCount, 1) | |
//TODO suppress/throttle consecutive duplicate errors? | |
log.Printf("ERROR in input: %s", err) | |
} | |
func (ac *accumulator) Debug() bool { | |
return ac.debug | |
} | |
func (ac *accumulator) SetDebug(debug bool) { | |
ac.debug = debug | |
} | |
func (ac *accumulator) Trace() bool { | |
return ac.trace | |
} | |
func (ac *accumulator) SetTrace(trace bool) { | |
ac.trace = trace | |
} | |
func (ac *accumulator) SetPrecision(precision, interval time.Duration) { | |
panic("not implemented") | |
} | |
func (ac *accumulator) DisablePrecision() { | |
panic("not implemented") | |
} | |
func (ac *accumulator) setDefaultTags(tags map[string]string) { | |
ac.defaultTags = tags | |
} | |
func (ac *accumulator) addDefaultTag(key, value string) { | |
if ac.defaultTags == nil { | |
ac.defaultTags = make(map[string]string) | |
} | |
ac.defaultTags[key] = value | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"github.com/influxdata/telegraf" | |
"github.com/influxdata/telegraf/plugins/outputs" | |
"github.com/influxdata/telegraf/plugins/outputs/influxdb" | |
"os" | |
"time" | |
// import all required input plugins | |
"github.com/influxdata/telegraf/plugins/inputs" | |
_ "github.com/influxdata/telegraf/plugins/inputs/system" | |
) | |
func main() { | |
// channel shared between all input threads for accumulating metrics | |
metricC := make(chan telegraf.Metric, 10000) | |
acc := NewAccumulator(metricC) | |
// Set default tags | |
if hostname, err := os.Hostname(); err != nil { | |
acc.addDefaultTag("host", hostname) | |
} | |
// Configure output plugin | |
out := outputs.Outputs["influxdb"] | |
influx := out().(*influxdb.InfluxDB) | |
influx.URLs = []string{"http://localhost:8086"} | |
influx.Connect() | |
// Send gathered metrics continously to the output plugin | |
go func() { | |
for metric := range metricC { | |
influx.Write([]telegraf.Metric{metric}) | |
} | |
}() | |
// Run the inputs continously | |
for range time.NewTicker(time.Minute).C { | |
in := inputs.Inputs["system"] | |
in().Gather(acc) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment