Skip to content

Instantly share code, notes, and snippets.

@forestjohnsonpeoplenet
Created July 19, 2017 15:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save forestjohnsonpeoplenet/a5fb6fd2916b696e167e753c37fd9f10 to your computer and use it in GitHub Desktop.
Save forestjohnsonpeoplenet/a5fb6fd2916b696e167e753c37fd9f10 to your computer and use it in GitHub Desktop.
createTagFromMeasurementName UDF
package main
import (
"errors"
"log"
"os"
"github.com/influxdata/kapacitor/udf/agent"
)
type createTagFromMeasurementNameHandler struct {
agent *agent.Agent
}
func newcreateTagFromMeasurementNameHandler(agent *agent.Agent) *createTagFromMeasurementNameHandler {
return &createTagFromMeasurementNameHandler{agent: agent}
}
// Return the InfoResponse. Describing the properties of this UDF agent.
func (*createTagFromMeasurementNameHandler) Info() (*agent.InfoResponse, error) {
return &agent.InfoResponse{
Wants: agent.EdgeType_STREAM,
Provides: agent.EdgeType_STREAM,
Options: map[string]*agent.OptionInfo{},
}, nil
}
// Initialze the handler based of the provided options.
func (*createTagFromMeasurementNameHandler) Init(r *agent.InitRequest) (*agent.InitResponse, error) {
init := &agent.InitResponse{
Success: true,
Error: "",
}
return init, nil
}
// Create a snapshot of the running state of the process.
func (*createTagFromMeasurementNameHandler) Snapshot() (*agent.SnapshotResponse, error) {
return &agent.SnapshotResponse{}, nil
}
// Restore a previous snapshot.
func (*createTagFromMeasurementNameHandler) Restore(req *agent.RestoreRequest) (*agent.RestoreResponse, error) {
return &agent.RestoreResponse{
Success: true,
}, nil
}
func (*createTagFromMeasurementNameHandler) BeginBatch(begin *agent.BeginBatch) error {
return errors.New("batching not supported")
}
func (h *createTagFromMeasurementNameHandler) Point(point *agent.Point) error {
point.Tags["measurementName"] = point.Name
h.agent.Responses <- &agent.Response{
Message: &agent.Response_Point{
Point: point,
},
}
return nil
}
func (*createTagFromMeasurementNameHandler) EndBatch(end *agent.EndBatch) error {
return nil
}
// Stop the handler gracefully.
func (h *createTagFromMeasurementNameHandler) Stop() {
close(h.agent.Responses)
}
func main() {
a := agent.New(os.Stdin, os.Stdout)
h := newcreateTagFromMeasurementNameHandler(a)
a.Handler = h
log.Println("Starting agent")
a.Start()
err := a.Wait()
if err != nil {
log.Fatal(err)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment