Skip to content

Instantly share code, notes, and snippets.

@RoeiDimi
Created January 7, 2024 08:59
Show Gist options
  • Save RoeiDimi/5856f98f8189c0879c627c7b7b523179 to your computer and use it in GitHub Desktop.
Save RoeiDimi/5856f98f8189c0879c627c7b7b523179 to your computer and use it in GitHub Desktop.
csv_parser with jarray support
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package csv // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/csv"
import (
"context"
csvparser "encoding/csv"
"errors"
"fmt"
"io"
"strings"
"go.uber.org/zap"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"github.com/valyala/fastjson"
)
const operatorType = "csv_parser"
func init() {
operator.Register(operatorType, func() operator.Builder { return NewConfig() })
}
// NewConfig creates a new csv parser config with default values
func NewConfig() *Config {
return NewConfigWithID(operatorType)
}
// NewConfigWithID creates a new csv parser config with default values
func NewConfigWithID(operatorID string) *Config {
return &Config{
ParserConfig: helper.NewParserConfig(operatorID, operatorType),
}
}
// Config is the configuration of a csv parser operator.
type Config struct {
helper.ParserConfig `mapstructure:",squash"`
Header string `mapstructure:"header"`
HeaderDelimiter string `mapstructure:"header_delimiter"`
HeaderAttribute string `mapstructure:"header_attribute"`
FieldDelimiter string `mapstructure:"delimiter"`
LazyQuotes bool `mapstructure:"lazy_quotes"`
IgnoreQuotes bool `mapstructure:"ignore_quotes"`
IsJarray bool `mapstructure:"is_jarray"`
}
// Build will build a csv parser operator.
func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
parserOperator, err := c.ParserConfig.Build(logger)
if err != nil {
return nil, err
}
if c.FieldDelimiter == "" {
c.FieldDelimiter = ","
}
if c.HeaderDelimiter == "" {
c.HeaderDelimiter = c.FieldDelimiter
}
if c.IgnoreQuotes && c.LazyQuotes {
return nil, errors.New("only one of 'ignore_quotes' or 'lazy_quotes' can be true")
}
if c.IsJarray && (c.IgnoreQuotes || c.LazyQuotes) {
return nil, errors.New("ignore_quotes or lazy_quotes isn't allowed when 'is_jarray' is true")
}
fieldDelimiter := []rune(c.FieldDelimiter)[0]
headerDelimiter := []rune(c.HeaderDelimiter)[0]
if len([]rune(c.FieldDelimiter)) != 1 {
return nil, fmt.Errorf("invalid 'delimiter': '%s'", c.FieldDelimiter)
}
if len([]rune(c.HeaderDelimiter)) != 1 {
return nil, fmt.Errorf("invalid 'header_delimiter': '%s'", c.HeaderDelimiter)
}
var headers []string
switch {
case c.Header == "" && c.HeaderAttribute == "":
return nil, errors.New("missing required field 'header' or 'header_attribute'")
case c.Header != "" && c.HeaderAttribute != "":
return nil, errors.New("only one header parameter can be set: 'header' or 'header_attribute'")
case c.Header != "" && !strings.Contains(c.Header, c.HeaderDelimiter):
return nil, errors.New("missing field delimiter in header")
case c.Header != "":
headers = strings.Split(c.Header, c.HeaderDelimiter)
}
pp := &fastjson.ParserPool{}
return &Parser{
ParserOperator: parserOperator,
header: headers,
headerAttribute: c.HeaderAttribute,
fieldDelimiter: fieldDelimiter,
headerDelimiter: headerDelimiter,
lazyQuotes: c.LazyQuotes,
ignoreQuotes: c.IgnoreQuotes,
isJarray: c.IsJarray,
parse: generateParseFunc(headers, fieldDelimiter, c.LazyQuotes, c.IgnoreQuotes, c.IsJarray, pp),
pp: pp,
}, nil
}
// Parser is an operator that parses csv in an entry.
type Parser struct {
helper.ParserOperator
fieldDelimiter rune
headerDelimiter rune
header []string
headerAttribute string
lazyQuotes bool
ignoreQuotes bool
isJarray bool
parse parseFunc
pp *fastjson.ParserPool
}
type parseFunc func(any) (any, error)
// Process will parse an entry for csv.
func (r *Parser) Process(ctx context.Context, e *entry.Entry) error {
parse := r.parse
// If we have a headerAttribute set we need to dynamically generate our parser function
if r.headerAttribute != "" {
h, ok := e.Attributes[r.headerAttribute]
if !ok {
err := fmt.Errorf("failed to read dynamic header attribute %s", r.headerAttribute)
r.Error(err)
return err
}
headerString, ok := h.(string)
if !ok {
err := fmt.Errorf("header is expected to be a string but is %T", h)
r.Error(err)
return err
}
headers := strings.Split(headerString, string([]rune{r.headerDelimiter}))
parse = generateParseFunc(headers, r.fieldDelimiter, r.lazyQuotes, r.ignoreQuotes, r.isJarray, r.pp)
}
return r.ParserOperator.ProcessWith(ctx, e, parse)
}
// generateParseFunc returns a parse function for a given header, allowing
// each entry to have a potentially unique set of fields when using dynamic
// field names retrieved from an entry's attribute
func generateParseFunc(headers []string, fieldDelimiter rune, lazyQuotes bool, ignoreQuotes bool, isJarray bool, pp *fastjson.ParserPool) parseFunc {
if isJarray {
return generateJarrayParseFunc(headers, pp)
}
if ignoreQuotes {
return generateSplitParseFunc(headers, fieldDelimiter)
}
return generateCSVParseFunc(headers, fieldDelimiter, lazyQuotes)
}
func generateJarrayParseFunc(headers []string, pp *fastjson.ParserPool) parseFunc {
return func(value any) (any, error) {
jArrayLine, err := valueAsString(value)
if err != nil {
return nil, err
}
p := pp.Get()
v, err := p.Parse(jArrayLine)
pp.Put(p)
if err != nil {
return nil, errors.New("failed to parse entry")
}
jArray := v.GetArray() // a is a []*Value slice
parsedValues := make(map[string]any)
for i := range jArray {
switch jArray[i].Type() {
case fastjson.TypeNumber:
parsedValues[headers[i]] = jArray[i].GetInt64()
case fastjson.TypeString:
parsedValues[headers[i]] = string(jArray[i].GetStringBytes())
case fastjson.TypeTrue:
parsedValues[headers[i]] = true
case fastjson.TypeFalse:
parsedValues[headers[i]] = false
case fastjson.TypeNull:
parsedValues[headers[i]] = nil
default:
return nil, errors.New("failed to parse entry: " + string(jArray[i].MarshalTo(nil)))
}
}
return parsedValues, nil
}
}
// generateCSVParseFunc returns a parse function for a given header and field delimiter, which parses a line of CSV text.
func generateCSVParseFunc(headers []string, fieldDelimiter rune, lazyQuotes bool) parseFunc {
return func(value any) (any, error) {
csvLine, err := valueAsString(value)
if err != nil {
return nil, err
}
reader := csvparser.NewReader(strings.NewReader(csvLine))
reader.Comma = fieldDelimiter
reader.FieldsPerRecord = len(headers)
reader.LazyQuotes = lazyQuotes
// Typically only need one
lines := make([][]string, 0, 1)
for {
line, err := reader.Read()
if errors.Is(err, io.EOF) {
break
}
if err != nil && len(line) == 0 {
return nil, errors.New("failed to parse entry")
}
lines = append(lines, line)
}
/*
This parser is parsing a single value, which came from a single log entry.
Therefore, if there are multiple lines here, it should be assumed that each
subsequent line contains a continuation of the last field in the previous line.
Given a file w/ headers "A,B,C,D,E" and contents "aa,b\nb,cc,d\nd,ee",
expect reader.Read() to return bodies:
- ["aa","b"]
- ["b","cc","d"]
- ["d","ee"]
*/
joinedLine := lines[0]
for i := 1; i < len(lines); i++ {
nextLine := lines[i]
// The first element of the next line is a continuation of the previous line's last element
joinedLine[len(joinedLine)-1] += "\n" + nextLine[0]
// The remainder are separate elements
for n := 1; n < len(nextLine); n++ {
joinedLine = append(joinedLine, nextLine[n])
}
}
return headersMap(headers, joinedLine)
}
}
// generateSplitParseFunc returns a parse function (which ignores quotes) for a given header and field delimiter.
func generateSplitParseFunc(headers []string, fieldDelimiter rune) parseFunc {
return func(value any) (any, error) {
csvLine, err := valueAsString(value)
if err != nil {
return nil, err
}
// This parse function does not do any special quote handling; Splitting on the delimiter is sufficient.
fields := strings.Split(csvLine, string(fieldDelimiter))
return headersMap(headers, fields)
}
}
// valueAsString interprets the given value as a string.
func valueAsString(value any) (string, error) {
var s string
switch t := value.(type) {
case string:
s += t
case []byte:
s += string(t)
default:
return s, fmt.Errorf("type '%T' cannot be parsed as csv", value)
}
return s, nil
}
// headersMap creates a map of headers[i] -> fields[i].
func headersMap(headers []string, fields []string) (map[string]any, error) {
parsedValues := make(map[string]any)
if len(fields) != len(headers) {
return nil, fmt.Errorf("wrong number of fields: expected %d, found %d", len(headers), len(fields))
}
for i, val := range fields {
parsedValues[headers[i]] = val
}
return parsedValues, nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment