Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save forestjohnsonpeoplenet/f0aa7a807f0f1735ea02279338cdeaf1 to your computer and use it in GitHub Desktop.
Save forestjohnsonpeoplenet/f0aa7a807f0f1735ea02279338cdeaf1 to your computer and use it in GitHub Desktop.
Kapacitor UDF to make measurement name available to lambdas
package main
import (
"bytes"
"encoding/gob"
"errors"
"log"
"os"
"github.com/influxdata/kapacitor/udf"
udfAgent "github.com/influxdata/kapacitor/udf/agent"
)
type CreateTagFromMeasurementNameHandler struct {
Agent *udfAgent.Agent
}
func newCreateTagFromMeasurementNameHandler(agent *udfAgent.Agent) *CreateTagFromMeasurementNameHandler {
return &CreateTagFromMeasurementNameHandler{
Agent: agent,
}
}
// Return the InfoResponse. Describing the properties of this UDF agent.
func (createTagFromMeasurementNameHandler *CreateTagFromMeasurementNameHandler) Info() (*udf.InfoResponse, error) {
info := &udf.InfoResponse{
Wants: udf.EdgeType_STREAM,
Provides: udf.EdgeType_STREAM,
Options: map[string]*udf.OptionInfo{},
}
return info, nil
}
// Initialze the handler based of the provided options.
func (createTagFromMeasurementNameHandler *CreateTagFromMeasurementNameHandler) Init(initRequest *udf.InitRequest) (*udf.InitResponse, error) {
return &udf.InitResponse{
Success: true,
Error: "",
}, nil
}
// Create a snapshot of the running state of the process. NoOp since its stateless.
func (createTagFromMeasurementNameHandler *CreateTagFromMeasurementNameHandler) Snaphost() (*udf.SnapshotResponse, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
enc.Encode(0)
return &udf.SnapshotResponse{
Snapshot: buf.Bytes(),
}, nil
}
// Restore a previous snapshot. NoOp since its stateless.
func (createTagFromMeasurementNameHandler *CreateTagFromMeasurementNameHandler) Restore(req *udf.RestoreRequest) (*udf.RestoreResponse, error) {
return &udf.RestoreResponse{
Success: true,
Error: "",
}, nil
}
// This handler does not do batching
func (createTagFromMeasurementNameHandler *CreateTagFromMeasurementNameHandler) BeginBatch(*udf.BeginBatch) error {
return errors.New("batching not supported")
}
func (createTagFromMeasurementNameHandler *CreateTagFromMeasurementNameHandler) Point(point *udf.Point) error {
point.Tags["measurementName"] = point.Name
createTagFromMeasurementNameHandler.Agent.Responses <- &udf.Response{
Message: &udf.Response_Point{
Point: point,
},
}
return nil
}
// This handler does not do batching
func (createTagFromMeasurementNameHandler *CreateTagFromMeasurementNameHandler) EndBatch(*udf.EndBatch) error {
return errors.New("batching not supported")
}
// Stop the handler gracefully.
func (createTagFromMeasurementNameHandler *CreateTagFromMeasurementNameHandler) Stop() {
close(createTagFromMeasurementNameHandler.Agent.Responses)
}
func main() {
agent := udfAgent.New(os.Stdin, os.Stdout)
handler := newCreateTagFromMeasurementNameHandler(agent)
agent.Handler = handler
log.Println("Starting agent")
agent.Start()
err := agent.Wait()
if err != nil {
log.Fatal(err)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment