Skip to content

Instantly share code, notes, and snippets.

@RoeiDimi
Created January 14, 2024 10:11
Show Gist options
  • Save RoeiDimi/6bc8c670a95005420ffc95c6729c74fa to your computer and use it in GitHub Desktop.
Save RoeiDimi/6bc8c670a95005420ffc95c6729c74fa to your computer and use it in GitHub Desktop.
Otel collector transformer jarray -> csv
package jarray_to_csv
import (
"bytes"
"context"
"encoding/csv"
"fmt"
"strconv"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/errors"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"github.com/valyala/fastjson"
"go.uber.org/zap"
)
const operatorType = "jarray_to_csv"
func init() {
operator.Register(operatorType, func() operator.Builder { return NewConfig() })
}
// NewConfig creates a new jarray_to_csv operator config with default values
func NewConfig() *Config {
return NewConfigWithID(operatorType)
}
// NewConfigWithID creates a new jarray_to_csv operator config with default values
func NewConfigWithID(operatorID string) *Config {
return &Config{
TransformerConfig: helper.NewTransformerConfig(operatorID, operatorType),
}
}
// Config is the configuration of a jarray_to_csv operator
type Config struct {
helper.TransformerConfig `mapstructure:",squash"`
Field entry.Field `mapstructure:"field"`
}
// Build will build a jarray_to_csv operator from the supplied configuration
func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
transformerOperator, err := c.TransformerConfig.Build(logger)
if err != nil {
return nil, err
}
if e, ok := c.Field.FieldInterface.(entry.BodyField); ok {
return &Transformer[entry.BodyField]{
TransformerOperator: transformerOperator,
Field: e,
JsonParserPool: &fastjson.ParserPool{},
}, nil
}
if e, ok := c.Field.FieldInterface.(entry.ResourceField); ok {
return &Transformer[entry.ResourceField]{
TransformerOperator: transformerOperator,
Field: e,
JsonParserPool: &fastjson.ParserPool{},
}, nil
}
if e, ok := c.Field.FieldInterface.(entry.AttributeField); ok {
return &Transformer[entry.AttributeField]{
TransformerOperator: transformerOperator,
Field: e,
JsonParserPool: &fastjson.ParserPool{},
}, nil
}
return nil, fmt.Errorf("invalid field type: %T", c.Field.FieldInterface)
}
// Transformer transforms a jarray object in the entry field into a csv line
type Transformer[T interface {
entry.BodyField | entry.ResourceField | entry.AttributeField
entry.FieldInterface
}] struct {
helper.TransformerOperator
Field T
JsonParserPool *fastjson.ParserPool
}
// Process will process an entry with a jarray to csv transformation.
func (p *Transformer[T]) Process(ctx context.Context, entry *entry.Entry) error {
return p.ProcessWith(ctx, entry, p.Transform)
}
// Transform will apply the jarray to csv operation to an entry
func (p *Transformer[T]) Transform(entry *entry.Entry) error {
jarrayInterface, ok := entry.Delete(p.Field)
if !ok {
// The field doesn't exist, so ignore it
return fmt.Errorf("apply jarray to csv: field %s does not exist on entry", p.Field)
}
jarrayString, err := valueAsString(jarrayInterface)
if err != nil {
fmt.Errorf("apply jarray to csv: couldn't convert field %s to a string", p.Field)
}
csvString, err := p.JarrayToCsv(jarrayString)
if err != nil {
// The field we were asked to convert to a csv was not a jarray, so put it back
err := entry.Set(p.Field, jarrayString)
if err != nil {
return errors.Wrap(err, "reset non-jarray field")
}
return fmt.Errorf("apply jarray to csv: field %s is not a jarray", p.Field)
}
err = entry.Set(p.Field, csvString)
if err != nil {
return err
}
return nil
}
func (p *Transformer[T]) JarrayToCsv(jArrayLine string) (string, error) {
parser := p.JsonParserPool.Get()
v, err := parser.Parse(jArrayLine)
p.JsonParserPool.Put(parser)
if err != nil {
return "", err
}
jArray := v.GetArray() // a is a []*Value slice
jarrayValues := make([]any, len(jArray))
for i := range jArray {
val := jArray[i]
switch val.Type() {
case fastjson.TypeNumber:
jarrayValues[i] = strconv.FormatInt(val.GetInt64(), 10)
case fastjson.TypeString:
jarrayValues[i] = string(val.GetStringBytes())
case fastjson.TypeTrue:
jarrayValues[i] = strconv.FormatBool(true)
case fastjson.TypeFalse:
jarrayValues[i] = strconv.FormatBool(false)
case fastjson.TypeNull:
jarrayValues[i] = ""
default:
return "", fmt.Errorf("failed to parse entry: %s", string(val.MarshalTo(nil)))
}
}
var buffer bytes.Buffer
writer := csv.NewWriter(&buffer)
if err := writer.Write(jarrayValues); err != nil {
return "", err
}
// Flush the writer to ensure the record is written
writer.Flush()
// Check for any errors during writing
err = writer.Error()
if err != nil {
return "", err
}
return buffer.String(), nil
}
// valueAsString interprets the given value as a string.
func valueAsString(value any) (string, error) {
var s string
switch t := value.(type) {
case string:
s += t
case []byte:
s += string(t)
default:
return s, fmt.Errorf("type '%T' cannot be parsed as csv", value)
}
return s, nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment