Skip to content

Instantly share code, notes, and snippets.

@dnwe
Created December 19, 2024 23:50
Show Gist options
  • Save dnwe/8cc61dad3e4fdabf857d03f20368dce5 to your computer and use it in GitHub Desktop.
Save dnwe/8cc61dad3e4fdabf857d03f20368dce5 to your computer and use it in GitHub Desktop.
Simple sarama client to connect to a single broker and submit an ApiVersions request and dump out the response
package main
import (
"crypto/tls"
"flag"
"fmt"
"log"
"os"
"github.com/IBM/sarama"
)
var names = map[int16]string{
0: "ProduceRequest",
1: "FetchRequest",
2: "ListOffsetsRequest",
3: "MetadataRequest",
4: "LeaderAndIsrRequest",
5: "StopReplicaRequest",
6: "UpdateMetadataRequest",
7: "ControlledShutdownRequest",
8: "OffsetCommitRequest",
9: "OffsetFetchRequest",
10: "FindCoordinatorRequest",
11: "JoinGroupRequest",
12: "HeartbeatRequest",
13: "LeaveGroupRequest",
14: "SyncGroupRequest",
15: "DescribeGroupsRequest",
16: "ListGroupsRequest",
17: "SaslHandshakeRequest",
18: "ApiVersionsRequest",
19: "CreateTopicsRequest",
20: "DeleteTopicsRequest",
21: "DeleteRecordsRequest",
22: "InitProducerIdRequest",
23: "OffsetForLeaderEpochRequest",
24: "AddPartitionsToTxnRequest",
25: "AddOffsetsToTxnRequest",
26: "EndTxnRequest",
27: "WriteTxnMarkersRequest",
28: "TxnOffsetCommitRequest",
29: "DescribeAclsRequest",
30: "CreateAclsRequest",
31: "DeleteAclsRequest",
32: "DescribeConfigsRequest",
33: "AlterConfigsRequest",
34: "AlterReplicaLogDirsRequest",
35: "DescribeLogDirsRequest",
36: "SaslAuthenticateRequest",
37: "CreatePartitionsRequest",
38: "CreateDelegationTokenRequest",
39: "RenewDelegationTokenRequest",
40: "ExpireDelegationTokenRequest",
41: "DescribeDelegationTokenRequest",
42: "DeleteGroupsRequest",
43: "ElectLeadersRequest",
44: "IncrementalAlterConfigsRequest",
45: "AlterPartitionReassignmentsRequest",
46: "ListPartitionReassignmentsRequest",
47: "OffsetDeleteRequest",
48: "DescribeClientQuotasRequest",
49: "AlterClientQuotasRequest",
50: "DescribeUserScramCredentialsRequest",
51: "AlterUserScramCredentialsRequest",
52: "VoteRequest",
53: "BeginQuorumEpochRequest",
54: "EndQuorumEpochRequest",
55: "DescribeQuorumRequest",
56: "AlterPartitionRequest",
57: "UpdateFeaturesRequest",
58: "EnvelopeRequest",
59: "FetchSnapshotRequest",
60: "DescribeClusterRequest",
61: "DescribeProducersRequest",
62: "BrokerRegistrationRequest",
63: "BrokerHeartbeatRequest",
64: "UnregisterBrokerRequest",
65: "DescribeTransactionsRequest",
66: "ListTransactionsRequest",
67: "AllocateProducerIdsRequest",
68: "ConsumerGroupHeartbeatRequest",
}
func main() {
broker := flag.String("broker", "", "A single Kafka broker hostname and port to connect to")
tlsSkipVerify := flag.Bool("tls-skip-verify", false, "Whether to skip TLS server cert verification")
useTLS := flag.Bool("tls", false, "Use TLS to communicate with the cluster")
verbose := flag.Bool("verbose", false, "Enable verbose Sarama logging")
flag.Parse()
if broker == nil || *broker == "" {
fmt.Fprintf(os.Stderr, "ERROR: no Kafka broker hostname defined, please set the -broker flag\n")
flag.Usage()
os.Exit(1)
}
if *verbose {
sarama.Logger = log.New(os.Stderr, "[DEBUG] ", log.Ldate|log.Ltime|log.Lmsgprefix)
}
cfg := sarama.NewConfig()
cfg.ClientID = "sarama-apiversions"
cfg.Version = sarama.V3_4_0_0
if *useTLS {
cfg.Net.TLS.Enable = true
cfg.Net.TLS.Config = &tls.Config{
InsecureSkipVerify: *tlsSkipVerify,
}
}
b := sarama.NewBroker(*broker)
if err := b.Open(cfg); err != nil {
log.Fatal(err)
}
defer b.Close()
resp, err := b.ApiVersions(&sarama.ApiVersionsRequest{
Version: 0,
})
if err != nil {
log.Fatal(err)
}
fmt.Print(",APIs,\n")
for _, kv := range resp.ApiKeys {
fmt.Printf("%d,%s,%d\n", kv.ApiKey, names[kv.ApiKey], kv.MaxVersion)
if kv.MinVersion != 0 {
fmt.Fprintf(os.Stderr, "[WARNING] %d,%s has a minimum version of %d\n", kv.ApiKey, names[kv.ApiKey], kv.MinVersion)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment