Skip to content

Instantly share code, notes, and snippets.

@grazor
Last active June 4, 2021 12:11
Show Gist options
  • Save grazor/be2b783e7108fbfcfb39ca7763c3d727 to your computer and use it in GitHub Desktop.
Save grazor/be2b783e7108fbfcfb39ca7763c3d727 to your computer and use it in GitHub Desktop.
Golang kafkacat -J decoder (aka yajl binary representation unmarshaller)
// Package kafkacat extracts messages from kafkacat -J output
package kafkacat
import (
"context"
"encoding/json"
"errors"
"io"
"time"
)
type DeserializerFunc func([]byte, bool) (interface{}, error)
type Deserializer struct {
Topic string
deserializer DeserializerFunc
}
type DeserializerFactory struct {
decoders map[string]*Deserializer
enrich bool
}
type EncodedPayload []byte
func (p *EncodedPayload) UnmarshalJSON(b []byte) error {
es, err := unquote(b)
if err != nil {
return err
}
*p = EncodedPayload(es)
return nil
}
type Message struct {
Topic string `json:"topic"`
Partition int `json:"partition"`
Offset int `json:"offset"`
Timestamp int64 `json:"ts"`
Datetime string `json:"datetime"`
Payload EncodedPayload `json:"payload,omitempty"`
Data interface{} `json:"data"`
Error error `json:"error"`
}
var ErrNoDeserializer = errors.New("missing deserializer for provided topic")
func Consume(ctx context.Context, stream io.Reader, factory *DeserializerFactory) <-chan *Message {
reader := func(ctx context.Context, r io.Reader, mc chan<- *Message, fact *DeserializerFactory) {
defer close(mc)
var deserializer DeserializerFunc
decoder := json.NewDecoder(r)
for {
rec := &Message{}
err := decoder.Decode(rec)
if err == io.EOF {
break
}
if err != nil {
select {
case mc <- rec:
case <-ctx.Done():
}
return
}
dt := time.Unix(rec.Timestamp/1000, 0)
rec.Datetime = dt.Format("2006-01-02 15:04")
if deserializer == nil && factory != nil {
deserializer = fact.Get(rec.Topic)
}
if deserializer != nil {
rec.Data, rec.Error = deserializer(rec.Payload, fact.Enrichment())
} else {
rec.Error = ErrNoDeserializer
}
select {
case mc <- rec:
case <-ctx.Done():
return
}
}
}
messagesChan := make(chan *Message)
go reader(ctx, stream, messagesChan, factory)
return messagesChan
}
package kafkacat
import (
"bytes"
"errors"
"unicode/utf8"
)
var ErrSyntax = errors.New("invalid string")
func unhex(b byte) (v rune, ok bool) {
c := rune(b)
switch {
case '0' <= c && c <= '9':
return c - '0', true
case 'a' <= c && c <= 'f':
return c - 'a' + 10, true
case 'A' <= c && c <= 'F':
return c - 'A' + 10, true
}
return
}
func unquoteChar(s []byte) (val []byte, tail []byte, err error) {
if s[0] != '\\' {
val = []byte{s[0]}
tail = s[1:]
return
}
var value rune
c := s[1]
s = s[2:]
switch c {
case 'a':
value = '\a'
case 'b':
value = '\b'
case 'f':
value = '\f'
case 'n':
value = '\n'
case 'r':
value = '\r'
case 't':
value = '\t'
case 'v':
value = '\v'
case 'x', 'u', 'U':
n := 0
switch c {
case 'x':
n = 2
case 'u':
n = 4
case 'U':
n = 8
}
var v rune
if len(s) < n {
err = ErrSyntax
return
}
for j := 0; j < n; j++ {
x, ok := unhex(s[j])
if !ok {
err = ErrSyntax
return
}
v = v<<4 | x
}
s = s[n:]
if c == 'x' {
// single-byte string, possibly not UTF-8
value = v
break
}
if v > utf8.MaxRune {
err = ErrSyntax
return
}
value = v
case '0', '1', '2', '3', '4', '5', '6', '7':
v := rune(c) - '0'
if len(s) < 2 {
err = ErrSyntax
return
}
for j := 0; j < 2; j++ { // one digit already; two more
x := rune(s[j]) - '0'
if x < 0 || x > 7 {
err = ErrSyntax
return
}
v = (v << 3) | x
}
s = s[2:]
if v > 255 {
err = ErrSyntax
return
}
value = v
case '\\':
value = '\\'
case '\'', '"':
value = rune(c)
default:
err = ErrSyntax
return
}
tail = s
val = make([]byte, 8)
l := utf8.EncodeRune(val, value)
val = val[:l]
return
}
func unquote(data []byte) ([]byte, error) {
var buf bytes.Buffer
data = data[1 : len(data)-1]
for len(data) > 0 {
val, tail, err := unquoteChar(data)
if err != nil {
return nil, err
}
data = tail
for _, v := range val {
buf.WriteByte(v)
}
}
return buf.Bytes(), nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment