Skip to content

Instantly share code, notes, and snippets.

@pugnascotia
Last active July 12, 2023 16:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pugnascotia/59574d78dc84983e498d84a795baaff8 to your computer and use it in GitHub Desktop.
Save pugnascotia/59574d78dc84983e498d84a795baaff8 to your computer and use it in GitHub Desktop.
franz-go polling example
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/twmb/franz-go/pkg/kgo"
"os"
"time"
)
func main() {
opts := []kgo.Opt{
kgo.SeedBrokers("localhost:29092"),
kgo.ConsumerGroup("usage-data-group"),
kgo.ConsumeTopics("usage-data-pipeline"),
kgo.DisableAutoCommit(),
kgo.BlockRebalanceOnPoll(),
kgo.WithLogger(kgo.BasicLogger(os.Stderr, kgo.LogLevelDebug, nil)),
}
client, err := kgo.NewClient(opts...)
if err != nil {
panic(err)
}
ctx := context.Background()
for {
fetches := client.PollFetches(ctx)
if errs := fetches.Errors(); len(errs) > 0 {
panic(fmt.Sprint(errs))
}
// or a callback function.
fetches.EachPartition(func(p kgo.FetchTopicPartition) {
fmt.Println("fetching")
fmt.Println("p.Err", p.Err)
messageIDs := make([]string, len(p.Records))
for i, record := range p.Records {
messageIDs[i] = getID(record)
}
fmt.Println("processing messages", "messageIDs", messageIDs)
// Simulate doing something
time.Sleep(time.Second)
err := client.CommitRecords(ctx, p.Records...)
if err != nil {
panic(fmt.Sprint(err))
}
})
client.AllowRebalance()
}
}
// getID parses the JSON in a record and returns its `id` field.
func getID(rec *kgo.Record) string {
payload := make(map[string]string)
_ = json.Unmarshal(rec.Value, &payload)
return payload["id"]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment