Skip to content

Instantly share code, notes, and snippets.

@edenhill
Created May 11, 2017 07:22
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save edenhill/9dfc019b980a5eb3365c84524a1f12b0 to your computer and use it in GitHub Desktop.
Save edenhill/9dfc019b980a5eb3365c84524a1f12b0 to your computer and use it in GitHub Desktop.
confluent-kafka-go example to start consuming 5 messages from the end (tail 5)
// Example function-based high-level Apache Kafka consumer
package main
/**
* Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// consumer_example implements a consumer using the non-channel Poll() API
// to retrieve messages and events.
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"os"
"os/signal"
"syscall"
)
func main() {
if len(os.Args) < 4 {
fmt.Fprintf(os.Stderr, "Usage: %s <broker> <group> <topics..>\n",
os.Args[0])
os.Exit(1)
}
broker := os.Args[1]
group := os.Args[2]
topics := os.Args[3:]
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": broker,
"group.id": group,
"go.application.rebalance.enable": true, // delegate Assign() responsibility to app
"session.timeout.ms": 6000,
"default.topic.config": kafka.ConfigMap{"auto.offset.reset": "earliest"}})
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
os.Exit(1)
}
fmt.Printf("Created Consumer %v\n", c)
err = c.SubscribeTopics(topics, nil)
run := true
for run == true {
select {
case sig := <-sigchan:
fmt.Printf("Caught signal %v: terminating\n", sig)
run = false
default:
ev := c.Poll(100)
if ev == nil {
continue
}
switch e := ev.(type) {
case kafka.AssignedPartitions:
parts := make([]kafka.TopicPartition,
len(e.Partitions))
for i, tp := range e.Partitions {
tp.Offset = kafka.OffsetTail(5) // Set start offset to 5 messages from end of partition
parts[i] = tp
}
fmt.Printf("Assign %v\n", parts)
c.Assign(parts)
case *kafka.Message:
fmt.Printf("%% Message on %s:\n%s\n",
e.TopicPartition, string(e.Value))
case kafka.PartitionEOF:
fmt.Printf("%% Reached %v\n", e)
case kafka.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
run = false
default:
fmt.Printf("Ignored %v\n", e)
}
}
}
fmt.Printf("Closing consumer\n")
c.Close()
}
@apabla
Copy link

apabla commented Aug 27, 2020

Hello could you please advise on how to obtain access to all partitions associated to a Topic? Is this the only way to locate topic partitions?
case kafka.AssignedPartitions:
parts := make([]kafka.TopicPartition,

@edenhill
Copy link
Author

@apabla Use GetMetadata() to get the partitions for a topic.

@apabla
Copy link

apabla commented Sep 1, 2020

@edenhill appreciate the feedback. I just figured it out :-)
Using
adminclient.GetMetadata(&singletopic, false, timeout); e != nil

Whatever is returned I am using that to construct a list of partitions to consumer from.
topicpartionlist := []kafka.TopicPartition{}

Would love to contribute to the community and share some of this work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment