-
-
Save dnwe/8cc61dad3e4fdabf857d03f20368dce5 to your computer and use it in GitHub Desktop.
Simple sarama client to connect to a single broker and submit an ApiVersions request and dump out the response
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"crypto/tls" | |
"flag" | |
"fmt" | |
"log" | |
"os" | |
"github.com/IBM/sarama" | |
) | |
var names = map[int16]string{ | |
0: "ProduceRequest", | |
1: "FetchRequest", | |
2: "ListOffsetsRequest", | |
3: "MetadataRequest", | |
4: "LeaderAndIsrRequest", | |
5: "StopReplicaRequest", | |
6: "UpdateMetadataRequest", | |
7: "ControlledShutdownRequest", | |
8: "OffsetCommitRequest", | |
9: "OffsetFetchRequest", | |
10: "FindCoordinatorRequest", | |
11: "JoinGroupRequest", | |
12: "HeartbeatRequest", | |
13: "LeaveGroupRequest", | |
14: "SyncGroupRequest", | |
15: "DescribeGroupsRequest", | |
16: "ListGroupsRequest", | |
17: "SaslHandshakeRequest", | |
18: "ApiVersionsRequest", | |
19: "CreateTopicsRequest", | |
20: "DeleteTopicsRequest", | |
21: "DeleteRecordsRequest", | |
22: "InitProducerIdRequest", | |
23: "OffsetForLeaderEpochRequest", | |
24: "AddPartitionsToTxnRequest", | |
25: "AddOffsetsToTxnRequest", | |
26: "EndTxnRequest", | |
27: "WriteTxnMarkersRequest", | |
28: "TxnOffsetCommitRequest", | |
29: "DescribeAclsRequest", | |
30: "CreateAclsRequest", | |
31: "DeleteAclsRequest", | |
32: "DescribeConfigsRequest", | |
33: "AlterConfigsRequest", | |
34: "AlterReplicaLogDirsRequest", | |
35: "DescribeLogDirsRequest", | |
36: "SaslAuthenticateRequest", | |
37: "CreatePartitionsRequest", | |
38: "CreateDelegationTokenRequest", | |
39: "RenewDelegationTokenRequest", | |
40: "ExpireDelegationTokenRequest", | |
41: "DescribeDelegationTokenRequest", | |
42: "DeleteGroupsRequest", | |
43: "ElectLeadersRequest", | |
44: "IncrementalAlterConfigsRequest", | |
45: "AlterPartitionReassignmentsRequest", | |
46: "ListPartitionReassignmentsRequest", | |
47: "OffsetDeleteRequest", | |
48: "DescribeClientQuotasRequest", | |
49: "AlterClientQuotasRequest", | |
50: "DescribeUserScramCredentialsRequest", | |
51: "AlterUserScramCredentialsRequest", | |
52: "VoteRequest", | |
53: "BeginQuorumEpochRequest", | |
54: "EndQuorumEpochRequest", | |
55: "DescribeQuorumRequest", | |
56: "AlterPartitionRequest", | |
57: "UpdateFeaturesRequest", | |
58: "EnvelopeRequest", | |
59: "FetchSnapshotRequest", | |
60: "DescribeClusterRequest", | |
61: "DescribeProducersRequest", | |
62: "BrokerRegistrationRequest", | |
63: "BrokerHeartbeatRequest", | |
64: "UnregisterBrokerRequest", | |
65: "DescribeTransactionsRequest", | |
66: "ListTransactionsRequest", | |
67: "AllocateProducerIdsRequest", | |
68: "ConsumerGroupHeartbeatRequest", | |
} | |
func main() { | |
broker := flag.String("broker", "", "A single Kafka broker hostname and port to connect to") | |
tlsSkipVerify := flag.Bool("tls-skip-verify", false, "Whether to skip TLS server cert verification") | |
useTLS := flag.Bool("tls", false, "Use TLS to communicate with the cluster") | |
verbose := flag.Bool("verbose", false, "Enable verbose Sarama logging") | |
flag.Parse() | |
if broker == nil || *broker == "" { | |
fmt.Fprintf(os.Stderr, "ERROR: no Kafka broker hostname defined, please set the -broker flag\n") | |
flag.Usage() | |
os.Exit(1) | |
} | |
if *verbose { | |
sarama.Logger = log.New(os.Stderr, "[DEBUG] ", log.Ldate|log.Ltime|log.Lmsgprefix) | |
} | |
cfg := sarama.NewConfig() | |
cfg.ClientID = "sarama-apiversions" | |
cfg.Version = sarama.V3_4_0_0 | |
if *useTLS { | |
cfg.Net.TLS.Enable = true | |
cfg.Net.TLS.Config = &tls.Config{ | |
InsecureSkipVerify: *tlsSkipVerify, | |
} | |
} | |
b := sarama.NewBroker(*broker) | |
if err := b.Open(cfg); err != nil { | |
log.Fatal(err) | |
} | |
defer b.Close() | |
resp, err := b.ApiVersions(&sarama.ApiVersionsRequest{ | |
Version: 0, | |
}) | |
if err != nil { | |
log.Fatal(err) | |
} | |
fmt.Print(",APIs,\n") | |
for _, kv := range resp.ApiKeys { | |
fmt.Printf("%d,%s,%d\n", kv.ApiKey, names[kv.ApiKey], kv.MaxVersion) | |
if kv.MinVersion != 0 { | |
fmt.Fprintf(os.Stderr, "[WARNING] %d,%s has a minimum version of %d\n", kv.ApiKey, names[kv.ApiKey], kv.MinVersion) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment