Created
January 14, 2024 10:11
-
-
Save RoeiDimi/6bc8c670a95005420ffc95c6729c74fa to your computer and use it in GitHub Desktop.
Otel collector transformer jarray -> csv
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package jarray_to_csv | |
import ( | |
"bytes" | |
"context" | |
"encoding/csv" | |
"fmt" | |
"strconv" | |
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" | |
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/errors" | |
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" | |
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" | |
"github.com/valyala/fastjson" | |
"go.uber.org/zap" | |
) | |
const operatorType = "jarray_to_csv" | |
func init() { | |
operator.Register(operatorType, func() operator.Builder { return NewConfig() }) | |
} | |
// NewConfig creates a new jarray_to_csv operator config with default values | |
func NewConfig() *Config { | |
return NewConfigWithID(operatorType) | |
} | |
// NewConfigWithID creates a new jarray_to_csv operator config with default values | |
func NewConfigWithID(operatorID string) *Config { | |
return &Config{ | |
TransformerConfig: helper.NewTransformerConfig(operatorID, operatorType), | |
} | |
} | |
// Config is the configuration of a jarray_to_csv operator | |
type Config struct { | |
helper.TransformerConfig `mapstructure:",squash"` | |
Field entry.Field `mapstructure:"field"` | |
} | |
// Build will build a jarray_to_csv operator from the supplied configuration | |
func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { | |
transformerOperator, err := c.TransformerConfig.Build(logger) | |
if err != nil { | |
return nil, err | |
} | |
if e, ok := c.Field.FieldInterface.(entry.BodyField); ok { | |
return &Transformer[entry.BodyField]{ | |
TransformerOperator: transformerOperator, | |
Field: e, | |
JsonParserPool: &fastjson.ParserPool{}, | |
}, nil | |
} | |
if e, ok := c.Field.FieldInterface.(entry.ResourceField); ok { | |
return &Transformer[entry.ResourceField]{ | |
TransformerOperator: transformerOperator, | |
Field: e, | |
JsonParserPool: &fastjson.ParserPool{}, | |
}, nil | |
} | |
if e, ok := c.Field.FieldInterface.(entry.AttributeField); ok { | |
return &Transformer[entry.AttributeField]{ | |
TransformerOperator: transformerOperator, | |
Field: e, | |
JsonParserPool: &fastjson.ParserPool{}, | |
}, nil | |
} | |
return nil, fmt.Errorf("invalid field type: %T", c.Field.FieldInterface) | |
} | |
// Transformer transforms a jarray object in the entry field into a csv line | |
type Transformer[T interface { | |
entry.BodyField | entry.ResourceField | entry.AttributeField | |
entry.FieldInterface | |
}] struct { | |
helper.TransformerOperator | |
Field T | |
JsonParserPool *fastjson.ParserPool | |
} | |
// Process will process an entry with a jarray to csv transformation. | |
func (p *Transformer[T]) Process(ctx context.Context, entry *entry.Entry) error { | |
return p.ProcessWith(ctx, entry, p.Transform) | |
} | |
// Transform will apply the jarray to csv operation to an entry | |
func (p *Transformer[T]) Transform(entry *entry.Entry) error { | |
jarrayInterface, ok := entry.Delete(p.Field) | |
if !ok { | |
// The field doesn't exist, so ignore it | |
return fmt.Errorf("apply jarray to csv: field %s does not exist on entry", p.Field) | |
} | |
jarrayString, err := valueAsString(jarrayInterface) | |
if err != nil { | |
fmt.Errorf("apply jarray to csv: couldn't convert field %s to a string", p.Field) | |
} | |
csvString, err := p.JarrayToCsv(jarrayString) | |
if err != nil { | |
// The field we were asked to convert to a csv was not a jarray, so put it back | |
err := entry.Set(p.Field, jarrayString) | |
if err != nil { | |
return errors.Wrap(err, "reset non-jarray field") | |
} | |
return fmt.Errorf("apply jarray to csv: field %s is not a jarray", p.Field) | |
} | |
err = entry.Set(p.Field, csvString) | |
if err != nil { | |
return err | |
} | |
return nil | |
} | |
func (p *Transformer[T]) JarrayToCsv(jArrayLine string) (string, error) { | |
parser := p.JsonParserPool.Get() | |
v, err := parser.Parse(jArrayLine) | |
p.JsonParserPool.Put(parser) | |
if err != nil { | |
return "", err | |
} | |
jArray := v.GetArray() // a is a []*Value slice | |
jarrayValues := make([]any, len(jArray)) | |
for i := range jArray { | |
val := jArray[i] | |
switch val.Type() { | |
case fastjson.TypeNumber: | |
jarrayValues[i] = strconv.FormatInt(val.GetInt64(), 10) | |
case fastjson.TypeString: | |
jarrayValues[i] = string(val.GetStringBytes()) | |
case fastjson.TypeTrue: | |
jarrayValues[i] = strconv.FormatBool(true) | |
case fastjson.TypeFalse: | |
jarrayValues[i] = strconv.FormatBool(false) | |
case fastjson.TypeNull: | |
jarrayValues[i] = "" | |
default: | |
return "", fmt.Errorf("failed to parse entry: %s", string(val.MarshalTo(nil))) | |
} | |
} | |
var buffer bytes.Buffer | |
writer := csv.NewWriter(&buffer) | |
if err := writer.Write(jarrayValues); err != nil { | |
return "", err | |
} | |
// Flush the writer to ensure the record is written | |
writer.Flush() | |
// Check for any errors during writing | |
err = writer.Error() | |
if err != nil { | |
return "", err | |
} | |
return buffer.String(), nil | |
} | |
// valueAsString interprets the given value as a string. | |
func valueAsString(value any) (string, error) { | |
var s string | |
switch t := value.(type) { | |
case string: | |
s += t | |
case []byte: | |
s += string(t) | |
default: | |
return s, fmt.Errorf("type '%T' cannot be parsed as csv", value) | |
} | |
return s, nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment