Skip to content

Instantly share code, notes, and snippets.

@scanterog
Last active March 2, 2023 14:27
Show Gist options
  • Save scanterog/b66a3d15a8209b5dfda0c21f4dc4c100 to your computer and use it in GitHub Desktop.
Save scanterog/b66a3d15a8209b5dfda0c21f4dc4c100 to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"os"
"os/signal"
"strconv"
"syscall"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
if len(os.Args) < 4 {
fmt.Fprintf(os.Stderr, "Usage: %s <id> <bootstrap-servers> <group> <topics..>\n",
os.Args[0])
os.Exit(1)
}
cId, _ := strconv.Atoi(os.Args[1])
bootstrapServers := os.Args[2]
group := os.Args[3]
topics := os.Args[4:]
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": bootstrapServers,
// Avoid connecting to IPv6 brokers:
// This is needed for the ErrAllBrokersDown show-case below
// when using localhost brokers on OSX, since the OSX resolver
// will return the IPv6 addresses first.
// You typically don't need to specify this configuration
// property.
"broker.address.family": "v4",
"max.poll.interval.ms": 30000,
"session.timeout.ms": 6000,
"heartbeat.interval.ms": 1000,
// Consumer group ID
"group.id": group,
// Use the cooperative incremental rebalance protocol.
"partition.assignment.strategy": "cooperative-sticky",
// Start reading from the first message of each assigned
// partition if there are no previously committed offsets
// for this group.
"auto.offset.reset": "earliest"})
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
os.Exit(1)
}
fmt.Printf("Created Consumer %v\n", c)
// Subscribe to topics, call the rebalanceCallback on assignment/revoke.
// The rebalanceCallback is triggered from c.Poll() and c.Close() below.
err = c.SubscribeTopics(topics, rebalanceCallback)
run := true
go func() {
for run {
time.Sleep(2 * time.Second)
tps, err := c.Assignment()
if err != nil {
fmt.Printf("failed to get assignment: %v\n", err)
}
if len(tps) != 0 {
fmt.Printf("[c%d] my current assignment is: %v \n", cId, tps)
}
}
}()
for run == true {
select {
case sig := <-sigchan:
fmt.Printf("Caught signal %v: terminating\n", sig)
run = false
default:
ev := c.Poll(100)
//if cId == 2 {
// time.Sleep(time.Second * 31)
// fmt.Printf("sleep is done!\n")
//}
if ev == nil {
continue
}
switch e := ev.(type) {
case *kafka.Message:
fmt.Printf("%% Message on %s:\n%s\n",
e.TopicPartition, string(e.Value))
if e.Headers != nil {
fmt.Printf("%% Headers: %v\n", e.Headers)
}
case kafka.Error:
// Errors should generally be considered
// informational, the client will try to
// automatically recover.
// But in this example we choose to terminate
// the application if all brokers are down.
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n",
e.Code(), e)
if e.Code() == kafka.ErrAllBrokersDown {
run = false
}
default:
fmt.Printf("Ignored %v\n", e)
}
}
}
fmt.Printf("Closing consumer\n")
c.Close()
}
// rebalanceCallback is called on each group rebalance to assign additional
// partitions, or remove existing partitions, from the consumer's current
// assignment.
//
// The application may use this optional callback to inspect the assignment,
// alter the initial start offset (the .Offset field of each assigned partition),
// and read/write offsets to commit to an alternative store outside of Kafka.
func rebalanceCallback(c *kafka.Consumer, event kafka.Event) error {
switch ev := event.(type) {
case kafka.AssignedPartitions:
fmt.Fprintf(os.Stderr,
"%% %s rebalance: %d new partition(s) assigned: %v\n",
c.GetRebalanceProtocol(), len(ev.Partitions),
ev.Partitions)
// The application may update the start .Offset of each
// assigned partition and then call IncrementalAssign().
// Even though this example does not alter the offsets we
// provide the call to IncrementalAssign() as an example.
err := c.IncrementalAssign(ev.Partitions)
if err != nil {
panic(err)
}
case kafka.RevokedPartitions:
fmt.Fprintf(os.Stderr,
"%% %s rebalance: %d partition(s) revoked: %v\n",
c.GetRebalanceProtocol(), len(ev.Partitions),
ev.Partitions)
if c.AssignmentLost() {
// Our consumer has been kicked out of the group and the
// entire assignment is thus lost.
fmt.Fprintf(os.Stderr, "%% Current assignment lost!\n")
}
// The client automatically calls IncrementalUnassign() unless
// the callback has already called that method.
}
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment