Last active
March 2, 2023 14:27
-
-
Save scanterog/b66a3d15a8209b5dfda0c21f4dc4c100 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"os" | |
"os/signal" | |
"strconv" | |
"syscall" | |
"time" | |
"github.com/confluentinc/confluent-kafka-go/kafka" | |
) | |
func main() { | |
if len(os.Args) < 4 { | |
fmt.Fprintf(os.Stderr, "Usage: %s <id> <bootstrap-servers> <group> <topics..>\n", | |
os.Args[0]) | |
os.Exit(1) | |
} | |
cId, _ := strconv.Atoi(os.Args[1]) | |
bootstrapServers := os.Args[2] | |
group := os.Args[3] | |
topics := os.Args[4:] | |
sigchan := make(chan os.Signal, 1) | |
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) | |
c, err := kafka.NewConsumer(&kafka.ConfigMap{ | |
"bootstrap.servers": bootstrapServers, | |
// Avoid connecting to IPv6 brokers: | |
// This is needed for the ErrAllBrokersDown show-case below | |
// when using localhost brokers on OSX, since the OSX resolver | |
// will return the IPv6 addresses first. | |
// You typically don't need to specify this configuration | |
// property. | |
"broker.address.family": "v4", | |
"max.poll.interval.ms": 30000, | |
"session.timeout.ms": 6000, | |
"heartbeat.interval.ms": 1000, | |
// Consumer group ID | |
"group.id": group, | |
// Use the cooperative incremental rebalance protocol. | |
"partition.assignment.strategy": "cooperative-sticky", | |
// Start reading from the first message of each assigned | |
// partition if there are no previously committed offsets | |
// for this group. | |
"auto.offset.reset": "earliest"}) | |
if err != nil { | |
fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err) | |
os.Exit(1) | |
} | |
fmt.Printf("Created Consumer %v\n", c) | |
// Subscribe to topics, call the rebalanceCallback on assignment/revoke. | |
// The rebalanceCallback is triggered from c.Poll() and c.Close() below. | |
err = c.SubscribeTopics(topics, rebalanceCallback) | |
run := true | |
go func() { | |
for run { | |
time.Sleep(2 * time.Second) | |
tps, err := c.Assignment() | |
if err != nil { | |
fmt.Printf("failed to get assignment: %v\n", err) | |
} | |
if len(tps) != 0 { | |
fmt.Printf("[c%d] my current assignment is: %v \n", cId, tps) | |
} | |
} | |
}() | |
for run == true { | |
select { | |
case sig := <-sigchan: | |
fmt.Printf("Caught signal %v: terminating\n", sig) | |
run = false | |
default: | |
ev := c.Poll(100) | |
//if cId == 2 { | |
// time.Sleep(time.Second * 31) | |
// fmt.Printf("sleep is done!\n") | |
//} | |
if ev == nil { | |
continue | |
} | |
switch e := ev.(type) { | |
case *kafka.Message: | |
fmt.Printf("%% Message on %s:\n%s\n", | |
e.TopicPartition, string(e.Value)) | |
if e.Headers != nil { | |
fmt.Printf("%% Headers: %v\n", e.Headers) | |
} | |
case kafka.Error: | |
// Errors should generally be considered | |
// informational, the client will try to | |
// automatically recover. | |
// But in this example we choose to terminate | |
// the application if all brokers are down. | |
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", | |
e.Code(), e) | |
if e.Code() == kafka.ErrAllBrokersDown { | |
run = false | |
} | |
default: | |
fmt.Printf("Ignored %v\n", e) | |
} | |
} | |
} | |
fmt.Printf("Closing consumer\n") | |
c.Close() | |
} | |
// rebalanceCallback is called on each group rebalance to assign additional | |
// partitions, or remove existing partitions, from the consumer's current | |
// assignment. | |
// | |
// The application may use this optional callback to inspect the assignment, | |
// alter the initial start offset (the .Offset field of each assigned partition), | |
// and read/write offsets to commit to an alternative store outside of Kafka. | |
func rebalanceCallback(c *kafka.Consumer, event kafka.Event) error { | |
switch ev := event.(type) { | |
case kafka.AssignedPartitions: | |
fmt.Fprintf(os.Stderr, | |
"%% %s rebalance: %d new partition(s) assigned: %v\n", | |
c.GetRebalanceProtocol(), len(ev.Partitions), | |
ev.Partitions) | |
// The application may update the start .Offset of each | |
// assigned partition and then call IncrementalAssign(). | |
// Even though this example does not alter the offsets we | |
// provide the call to IncrementalAssign() as an example. | |
err := c.IncrementalAssign(ev.Partitions) | |
if err != nil { | |
panic(err) | |
} | |
case kafka.RevokedPartitions: | |
fmt.Fprintf(os.Stderr, | |
"%% %s rebalance: %d partition(s) revoked: %v\n", | |
c.GetRebalanceProtocol(), len(ev.Partitions), | |
ev.Partitions) | |
if c.AssignmentLost() { | |
// Our consumer has been kicked out of the group and the | |
// entire assignment is thus lost. | |
fmt.Fprintf(os.Stderr, "%% Current assignment lost!\n") | |
} | |
// The client automatically calls IncrementalUnassign() unless | |
// the callback has already called that method. | |
} | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment