Skip to content

Instantly share code, notes, and snippets.

@yuwtennis
Last active March 13, 2022 10:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yuwtennis/382dc317b69dec9866efed88251f99cb to your computer and use it in GitHub Desktop.
Save yuwtennis/382dc317b69dec9866efed88251f99cb to your computer and use it in GitHub Desktop.
Quick test with kafka connect using postgres consuming with kaka-python
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=10
inter.broker.protocol.version=2.6.3
log.message.format.version=2.6.3
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
#zookeeper.connect=localhost:2181
zookeeper.connect=ZOOKEEPER_ADDRESS:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000
############################# Group Coordinator Settings #############################
# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=11
inter.broker.protocol.version=2.6.3
log.message.format.version=2.6.3
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=ZOOKEEPER_ADDRESS:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000
############################# Group Coordinator Settings #############################
# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
##
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
# This file contains some of the configurations for the Kafka Connect distributed worker. This file is intended
# to be used with the examples, and some settings may differ from those used in a production system, especially
# the `bootstrap.servers` and those specifying replication factors.
# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
bootstrap.servers=localhost:9092
# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
#group.id=connect-cluster
group.id=first
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true
# Topic to use for storing offsets. This topic should have many partitions and be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
#offset.storage.partitions=25
# Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated,
# and compacted topic. Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
config.storage.topic=connect-configs
config.storage.replication.factor=1
# Topic to use for storing statuses. This topic can have multiple partitions and should be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
status.storage.topic=connect-status
status.storage.replication.factor=1
#status.storage.partitions=5
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
# These are provided to inform the user about the presence of the REST host and port configs
# Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests.
#rest.host.name=
#rest.port=8083
# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
#rest.advertised.host.name=
#rest.advertised.port=
# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/usr/local/share/java
{
"name":"jdbc-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": 1,
"topic.prefix": "test-postgresql-jdbc-",
"connection.url": "jdbc:postgresql://localhost/YOUR_DEFAULT_USER_DATABASE?ssl=false",
"connection.user": "YOUR_USER",
"connection.password": "YOUR_PASSWORD",
"mode": "incrementing",
"incrementing.column.name": "id"
}
}
version: "3.9"
services:
postgresql:
image: postgres:14.2-alpine3.15
environment:
POSTGRES_PASSWORD: MY_PASSWORD
ports:
- "5432:5432"
networks:
dev:

This tutorial will guide through consuming message using jdbc source connector.
For consumer I used kafka python.

  • Step1 Download kafka
  • Step2 Boot up kafka cluster
  • Step3 Setup postgresql
  • Step4 Download jdbc driver
  • Step5 Boot up kafka connect
  • Step6 Start consuming
  • Step7 Insert event
  • Step8 Optional Add broker

Pre requisite

  • java
  • kafka==2.6.3
  • kafka-connect==2.6.3
  • kafka-python==2.0.2
  • python==3.8
  • docker
  • docker-compose
  • linux preferred

Step1 Download kafka

  1. Do Step1 Kafka

Step2 Boot up kafka cluster

  1. Replace config/server.properties with broker1_server.properties. Modify ZOOKEEPER_ADDRESS .

  2. Do STEP 2: START THE KAFKA ENVIRONMENT

Step3 Setup postgresql

  1. Modify POSTGRES_PASSWORD on docker-compose.yml

  2. Start postgresql

docker-compose up -d postgresql
  1. Prepare table * I have used y-watanabe as user.
docker exec -it docker_postgresql_1 psql -U postgres
postgres# CREATE ROLE "y-watanabe" WITH SUPERUSER LOGIN ;
postgres# CREATE DATABASE "y-watanabe" OWNER "y-watanabe";
\q ;

docker exec -it docker_postgresql_1 psql -U y-watanabe
y-watanabe# CREATE TABLE sample (id serial PRIMARY KEY , first_name VARCHAR (256) , last_name VARCHAR (256) , age INT ) ;

Step4 Download jdbc driver

  1. Download from https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc

  2. Move libraries where connect will read

sudo mkdir /usr/local/share/java
unzip confluentinc-kafka-connect-jdbc-10.3.3.zip
sudo cp confluentinc-kafka-connect-jdbc-10.3.3/lib/*jar /usr/local/share/java

Step5 Boot up kafka connect

  1. Replace config/connect-distributed.properties with connect-distributed.properties

  2. Start connect

export CLASSPATH=/usr/local/share/java/postgresql-42.3.2.jar
bin/connect-distributed.sh config/connect-distributed.properties
  1. [Load connect-jdbc-postgresql-source.properties] to connect
curl -XPOST localhost:8083/connectors -d @config/connect-jdbc-postgresql-source.properties  -H 'content-type: application/json'

Step6 Start consuming

  1. Start consumer using kafka-python
pip install kafka-python==2.0.2
python
from kafka import KafkaConsumer
for msg in consumer:
    data = msg.value.decode('utf-8')
    print(f'id: {str(data["payload"]["id"])}, first_name: {data["payload"]["first_name"]}')

Step7 Insert records

INSERT INTO sample(first_name, last_name, age) VALUES ('Saburo', 'Suzuki', 39) ;

Step8

  1. Prepare another kafka . Do Step1.

  2. Replace config/server.properties with broker2_server.properties. Notice that broker id is different. Modify ZOOKEEPER_ADDRESS .

  3. Do STEP 2: START THE KAFKA ENVIRONMENT

@yuwtennis
Copy link
Author

yuwtennis commented Mar 12, 2022

According to the doc ,

When communicating with a particular broker, a given client should use the highest API version supported by both and indicate this version in their requests.

Thus 2.5.0 is the highest supported version in kafka-python 2.0.2 .

>>> pprint.pprint(consumer.config)
{'api_version': (2, 5, 0),
 'api_version_auto_timeout_ms': 2000,
 'auto_commit_interval_ms': 5000,
 'auto_offset_reset': 'latest',
 'bootstrap_servers': 'localhost',
 'check_crcs': True,
 'client_id': 'kafka-python-2.0.2',
 'connections_max_idle_ms': 540000,
 'consumer_timeout_ms': inf,
 'default_offset_commit_callback': <function KafkaConsumer.<lambda> at 0x7f40fb482670>,
 'enable_auto_commit': True,
 'exclude_internal_topics': True,
 'fetch_max_bytes': 52428800,
 'fetch_max_wait_ms': 500,
 'fetch_min_bytes': 1,
 'group_id': None,
 'heartbeat_interval_ms': 3000,
 'key_deserializer': None,
 'legacy_iterator': False,
 'max_in_flight_requests_per_connection': 5,
 'max_partition_fetch_bytes': 1048576,
 'max_poll_interval_ms': 300000,
 'max_poll_records': 500,
 'metadata_max_age_ms': 300000,
 'metric_group_prefix': 'consumer',
 'metric_reporters': [],
 'metrics_num_samples': 2,
 'metrics_sample_window_ms': 30000,
 'partition_assignment_strategy': (<class 'kafka.coordinator.assignors.range.RangePartitionAssignor'>,
                                   <class 'kafka.coordinator.assignors.roundrobin.RoundRobinPartitionAssignor'>),
 'receive_buffer_bytes': None,
 'reconnect_backoff_max_ms': 1000,
 'reconnect_backoff_ms': 50,
 'request_timeout_ms': 305000,
 'retry_backoff_ms': 100,
 'sasl_kerberos_domain_name': None,
 'sasl_kerberos_service_name': 'kafka',
 'sasl_mechanism': None,
 'sasl_oauth_token_provider': None,
 'sasl_plain_password': None,
 'sasl_plain_username': None,
 'security_protocol': 'PLAINTEXT',
 'selector': <class 'selectors.EpollSelector'>,
 'send_buffer_bytes': None,
 'session_timeout_ms': 10000,
 'sock_chunk_buffer_count': 1000,
 'sock_chunk_bytes': 4096,
 'socket_options': [(6, 1, 1)],
 'ssl_cafile': None,
 'ssl_certfile': None,
 'ssl_check_hostname': True,
 'ssl_ciphers': None,
 'ssl_context': None,
 'ssl_crlfile': None,
 'ssl_keyfile': None,
 'ssl_password': None,
 'value_deserializer': None}
>>>

@yuwtennis
Copy link
Author

yuwtennis commented Mar 12, 2022

For kafka-python 2.0.2 , by consulting ApiVersionRequest,
highest api version is 2.5.0 .

INFO:kafka.conn:Broker version identified as 2.5.0

>>> consumer = KafkaConsumer('test-postgresql-jdbc-sample')                                                                                                                                                                          [17/523]DEBUG:kafka.metrics.metrics:Added sensor with name connections-closed
DEBUG:kafka.metrics.metrics:Added sensor with name connections-created
DEBUG:kafka.metrics.metrics:Added sensor with name select-time
DEBUG:kafka.metrics.metrics:Added sensor with name io-time
DEBUG:kafka.client:Initiating connection to node bootstrap-0 at localhost:9092
DEBUG:kafka.metrics.metrics:Added sensor with name bytes-sent-received
DEBUG:kafka.metrics.metrics:Added sensor with name bytes-sent
DEBUG:kafka.metrics.metrics:Added sensor with name bytes-received
DEBUG:kafka.metrics.metrics:Added sensor with name request-latency
DEBUG:kafka.metrics.metrics:Added sensor with name node-bootstrap-0.bytes-sent
DEBUG:kafka.metrics.metrics:Added sensor with name node-bootstrap-0.bytes-received
DEBUG:kafka.metrics.metrics:Added sensor with name node-bootstrap-0.latency
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <disconnected> [unspecified None]>: creating new socket
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <disconnected> [IPv6 ('::1', 9092, 0, 0)]>: setting socket option (6, 1, 1)
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: connecting to localhost:9092 [('::1', 9092, 0, 0) IPv6]
INFO:kafka.conn:Probing node bootstrap-0 broker version
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: established TCP connection
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: Connection complete.
DEBUG:kafka.client:Node bootstrap-0 connected
DEBUG:kafka.protocol.parser:Sending request ApiVersionRequest_v0()
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connected> [IPv6 ('::1', 9092, 0, 0)]> Request 1: ApiVersionRequest_v0()
DEBUG:kafka.protocol.parser:Sending request MetadataRequest_v0(topics=[])
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connected> [IPv6 ('::1', 9092, 0, 0)]> Request 2: MetadataRequest_v0(topics=[])
DEBUG:kafka.protocol.parser:Received correlation id: 1
DEBUG:kafka.protocol.parser:Processing response ApiVersionResponse_v0
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connected> [IPv6 ('::1', 9092, 0, 0)]> Response 1 (130.05328178405762 ms): ApiVersionResponse_v0(error_code=0, api_versions=[(api_key=0, min_version=0, max_version=8), (api_key=1, min_version=0, max_version=11), (api_key=2, min_version=0, max_version=5), (api_key=3, min_version=0, max_version=9), (api_key=4, min_version=0, max_version=4), (api_key=5, min_version=0, max_version=3), (api_key=6, min_version=0, max_version=6), (api_key=7, min_version=0, max_version=3), (api_key=8, min_version=0, max_version=8), (api_key=9, min_version=0, max_version=7), (api_key=10, min_version=0, max_version=3), (api_key=11, min_version=0, max_version=7), (api_key=12, min_version=0, max_version=4), (api_key=13, min_version=0, max_version=4), (api_key=14, min_version=0, max_version=5), (api_key=15, min_version=0, max_version=5), (api_key=16, min_version=0, max_version=4), (api_key=17, min_version=0, max_version=1), (api_key=18, min_version=0, max_version=3), (api_key=19, min_version=0, max_version=5), (api_key=20, min_version=0, max_version=4), (api_key=21, min_version=0, max_version=2), (api_key=22, min_version=0, max_version=3), (api_key=23, min_version=0, max_version=3), (api_key=24, min_version=0, max_version=1), (api_key=25, min_version=0, max_version=1), (api_key=26, min_version=0, max_version=1), (api_key=27, min_version=0, max_version=0), (api_key=28, min_version=0, max_version=3), (api_key=29, min_version=0, max_version=2), (api_key=30, min_version=0, max_version=2), (api_key=31, min_version=0, max_version=2), (api_key=32, min_version=0, max_version=3), (api_key=33, min_version=0, max_version=1), (api_key=34, min_version=0, max_version=1), (api_key=35, min_version=0, max_version=2), (api_key=36, min_version=0, max_version=2), (api_key=37, min_version=0, max_version=2), (api_key=38, min_version=0, max_version=2), (api_key=39, min_version=0, max_version=2), (api_key=40, min_version=0, max_version=2), (api_key=41, min_version=0, max_version=2), (api_key=42, min_version=0, max_version=2), (api_key=43, min_version=0, max_version=2), (api_key=44, min_version=0, max_version=1), (api_key=45, min_version=0, max_version=0), (api_key=46, min_version=0, max_version=0), (api_key=47, min_version=0, max_version=0), (api_key=48, min_version=0, max_version=0), (api_key=49, min_version=0, max_version=0)])
DEBUG:kafka.protocol.parser:Received correlation id: 2
DEBUG:kafka.protocol.parser:Processing response MetadataResponse_v0
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connected> [IPv6 ('::1', 9092, 0, 0)]> Response 2 (29.064655303955078 ms): MetadataResponse_v0(brokers=[(node_id=10, host='debian-primary', port=9092), (node_id=11, host='debian-secondary', port=9092)], topics=[(error_code=0, topic='connect-configs', partitions=[(error_code=0, partition=0, leader=10, replicas=[10], isr=[10])]), (error_code=0, topic='connect-status', partitions=[(error_code=0, partition=0, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=4, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=1, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=2, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=3, leader=11, replicas=[11], isr=[11])]), (error_code=0, topic='__consumer_offsets', partitions=[(error_code=0, partition=0, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=10, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=20, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=40, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=30, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=9, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=11, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=31, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=39, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=13, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=18, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=22, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=32, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=8, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=43, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=29, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=34, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=1, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=6, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=41, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=27, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=48, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=5, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=15, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=35, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=25, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=46, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=26, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=36, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=44, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=4, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=37, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=17, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=45, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=3, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=16, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=24, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=38, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=33, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=23, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=28, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=2, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=12, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=19, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=14, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=47, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=49, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=42, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=7, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=21, leader=10, replicas=[10], isr=[10])]), (error_code=0, topic='test-postgresql-jdbc-sample', partitions=[(error_code=0, partition=0, leader=11, replicas=[11], isr=[11])]), (error_code=0, topic='connect-offsets', partitions=[(error_code=0, partition=0, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=5, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=10, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=20, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=15, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=9, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=11, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=4, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=16, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=17, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=3, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=24, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=23, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=13, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=18, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=22, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=8, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=2, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=12, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=19, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=14, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=1, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=6, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=7, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=21, leader=10, replicas=[10], isr=[10])]), (error_code=0, topic='quickstart-events', partitions=[(error_code=0, partition=0, leader=11, replicas=[10, 11], isr=[11, 10])])])
INFO:kafka.conn:Broker version identified as 2.5.0
INFO:kafka.conn:Set configuration api_version=(2, 5, 0) to skip auto check_version requests on startup
DEBUG:kafka.metrics.metrics:Added sensor with name bytes-fetched
DEBUG:kafka.metrics.metrics:Added sensor with name records-fetched
DEBUG:kafka.metrics.metrics:Added sensor with name fetch-latency
DEBUG:kafka.metrics.metrics:Added sensor with name records-lag
DEBUG:kafka.metrics.metrics:Added sensor with name fetch-throttle-time
DEBUG:kafka.metrics.metrics:Added sensor with name heartbeat-latency
DEBUG:kafka.metrics.metrics:Added sensor with name join-latency
DEBUG:kafka.metrics.metrics:Added sensor with name sync-latency
WARNING:kafka.coordinator.consumer:group_id is None: disabling auto-commit.
DEBUG:kafka.metrics.metrics:Added sensor with name commit-latency
INFO:kafka.consumer.subscription_state:Updating subscribed topics to: ('test-postgresql-jdbc-sample',)

@yuwtennis
Copy link
Author

yuwtennis commented Mar 12, 2022

For kafka-python 2.0.1 , broker is identified as 1.0.0

>>> consumer = KafkaConsumer('test-postgresql-jdbc-sample')                                                                                                                                                                                  DEBUG:kafka.metrics.metrics:Added sensor with name connections-closed                                                                                                                                                                        DEBUG:kafka.metrics.metrics:Added sensor with name connections-created                                                                                                                                                                       DEBUG:kafka.metrics.metrics:Added sensor with name select-time                                                                                                                                                                               DEBUG:kafka.metrics.metrics:Added sensor with name io-time                                                                                                                                                                                   DEBUG:kafka.client:Initiating connection to node bootstrap-0 at localhost:9092                                                                                                                                                               DEBUG:kafka.metrics.metrics:Added sensor with name bytes-sent-received                                                                                                                                                                       DEBUG:kafka.metrics.metrics:Added sensor with name bytes-sent                                                                                                                                                                                DEBUG:kafka.metrics.metrics:Added sensor with name bytes-received                                                                                                                                                                            DEBUG:kafka.metrics.metrics:Added sensor with name request-latency                                                                                                                                                                           DEBUG:kafka.metrics.metrics:Added sensor with name node-bootstrap-0.bytes-sent                                                                                                                                                               DEBUG:kafka.metrics.metrics:Added sensor with name node-bootstrap-0.bytes-received                                                                                                                                                           DEBUG:kafka.metrics.metrics:Added sensor with name node-bootstrap-0.latency                                                                                                                                                                  DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <disconnected> [unspecified None]>: creating new socket                                                                                                           DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <disconnected> [IPv6 ('::1', 9092, 0, 0)]>: setting socket option (6, 1, 1)                                                                                       INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: connecting to localhost:9092 [('::1', 9092, 0, 0) IPv6]                                                                  INFO:kafka.conn:Probing node bootstrap-0 broker version                                                                                                                                                                                      DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: established TCP connection                                                                                              INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: Connection complete.
DEBUG:kafka.client:Node bootstrap-0 connected
DEBUG:kafka.protocol.parser:Sending request ApiVersionRequest_v0()
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connected> [IPv6 ('::1', 9092, 0, 0)]> Request 1: ApiVersionRequest_v0()
DEBUG:kafka.protocol.parser:Sending request MetadataRequest_v0(topics=[])
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connected> [IPv6 ('::1', 9092, 0, 0)]> Request 2: MetadataRequest_v0(topics=[])
DEBUG:kafka.protocol.parser:Received correlation id: 1
DEBUG:kafka.protocol.parser:Processing response ApiVersionResponse_v0
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connected> [IPv6 ('::1', 9092, 0, 0)]> Response 1 (111.67526245117188 ms): ApiVersionResponse_v0(error_code=0, api_versions=[(api_key=0, min_version=0, max_vers$on=8), (api_key=1, min_version=0, max_version=11), (api_key=2, min_version=0, max_version=5), (api_key=3, min_version=0, max_version=9), (api_key=4, min_version=0, max_version=4), (api_key=5, min_version=0, max_version=3), (api_key=6, m$n_version=0, max_version=6), (api_key=7, min_version=0, max_version=3), (api_key=8, min_version=0, max_version=8), (api_key=9, min_version=0, max_version=7), (api_key=10, min_version=0, max_version=3), (api_key=11, min_version=0, max_ve$sion=7), (api_key=12, min_version=0, max_version=4), (api_key=13, min_version=0, max_version=4), (api_key=14, min_version=0, max_version=5), (api_key=15, min_version=0, max_version=5), (api_key=16, min_version=0, max_version=4), (api_ke$=17, min_version=0, max_version=1), (api_key=18, min_version=0, max_version=3), (api_key=19, min_version=0, max_version=5), (api_key=20, min_version=0, max_version=4), (api_key=21, min_version=0, max_version=2), (api_key=22, min_version$0, max_version=3), (api_key=23, min_version=0, max_version=3), (api_key=24, min_version=0, max_version=1), (api_key=25, min_version=0, max_version=1), (api_key=26, min_version=0, max_version=1), (api_key=27, min_version=0, max_version=0$, (api_key=28, min_version=0, max_version=3), (api_key=29, min_version=0, max_version=2), (api_key=30, min_version=0, max_version=2), (api_key=31, min_version=0, max_version=2), (api_key=32, min_version=0, max_version=3), (api_key=33, m$n_version=0, max_version=1), (api_key=34, min_version=0, max_version=1), (api_key=35, min_version=0, max_version=2), (api_key=36, min_version=0, max_version=2), (api_key=37, min_version=0, max_version=2), (api_key=38, min_version=0, max$version=2), (api_key=39, min_version=0, max_version=2), (api_key=40, min_version=0, max_version=2), (api_key=41, min_version=0, max_version=2), (api_key=42, min_version=0, max_version=2), (api_key=43, min_version=0, max_version=2), (api$key=44, min_version=0, max_version=1), (api_key=45, min_version=0, max_version=0), (api_key=46, min_version=0, max_version=0), (api_key=47, min_version=0, max_version=0), (api_key=48, min_version=0, max_version=0), (api_key=49, min_vers$on=0, max_version=0)])
DEBUG:kafka.protocol.parser:Received correlation id: 2
DEBUG:kafka.protocol.parser:Processing response MetadataResponse_v0
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connected> [IPv6 ('::1', 9092, 0, 0)]> Response 2 (14.042377471923828 ms): MetadataResponse_v0(brokers=[(node_id=10, host='debian-primary', port=9092), (node_id$11, host='debian-secondary', port=9092)], topics=[(error_code=0, topic='connect-configs', partitions=[(error_code=0, partition=0, leader=10, replicas=[10], isr=[10])]), (error_code=0, topic='connect-status', partitions=[(error_code=0, p$rtition=0, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=4, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=1, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=2, leader=10, replicas=[10], isr=$10]), (error_code=0, partition=3, leader=11, replicas=[11], isr=[11])]), (error_code=0, topic='__consumer_offsets', partitions=[(error_code=0, partition=0, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=10, leader=11, rep$icas=[11], isr=[11]), (error_code=0, partition=20, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=40, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=30, leader=11, replicas=[11], isr=[11]), (error_code=0, p$rtition=9, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=11, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=31, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=39, leader=10, replicas=[10], i$r=[10]), (error_code=0, partition=13, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=18, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=22, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=32, $eader=11, replicas=[11], isr=[11]), (error_code=0, partition=8, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=43, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=29, leader=10, replicas=[10], isr=[10]), (er$or_code=0, partition=34, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=1, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=6, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=41, leader=10, repl$cas=[10], isr=[10]), (error_code=0, partition=27, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=48, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=5, leader=10, replicas=[10], isr=[10]), (error_code=0, par$ition=15, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=35, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=25, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=46, leader=11, replicas=[11], is$=[11]), (error_code=0, partition=26, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=36, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=44, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=4, le$der=11, replicas=[11], isr=[11]), (error_code=0, partition=37, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=17, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=45, leader=10, replicas=[10], isr=[10]), (err$r_code=0, partition=3, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=16, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=24, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=38, leader=11, repl$cas=[11], isr=[11]), (error_code=0, partition=33, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=23, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=28, leader=11, replicas=[11], isr=[11]), (error_code=0, pa$tition=2, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=12, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=19, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=14, leader=11, replicas=[11], istition=2, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=12, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=19, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=14, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=47, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=49, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=42, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=7, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=21, leader=10, replicas=[10], isr=[10])]), (error_code=0, topic='test-postgresql-jdbc-sample', partitions=[(error_code=0, partition=0, leader=11, replicas=[11], isr=[11])]), (error_code=0, topic='connect-offsets', partitions=[(error_code=0, partition=0, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=5, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=10, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=20, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=15, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=9, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=11, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=4, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=16, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=17, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=3, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=24, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=23, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=13, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=18, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=22, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=8, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=2, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=12, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=19, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=14, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=1, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=6, leader=11, replicas=[11], isr=[11]), (error_code=0, partition=7, leader=10, replicas=[10], isr=[10]), (error_code=0, partition=21, leader=10, replicas=[10], isr=[10])]), (error_code=0, topic='quickstart-events', partitions=[(error_code=0, partition=0, leader=11, replicas=[10, 11], isr=[11, 10])])])
INFO:kafka.conn:Broker version identified as 1.0.0
INFO:kafka.conn:Set configuration api_version=(1, 0, 0) to skip auto check_version requests on startup
DEBUG:kafka.metrics.metrics:Added sensor with name bytes-fetched
DEBUG:kafka.metrics.metrics:Added sensor with name records-fetched
DEBUG:kafka.metrics.metrics:Added sensor with name fetch-latency
DEBUG:kafka.metrics.metrics:Added sensor with name records-lag
DEBUG:kafka.metrics.metrics:Added sensor with name fetch-throttle-time
DEBUG:kafka.metrics.metrics:Added sensor with name heartbeat-latency
DEBUG:kafka.metrics.metrics:Added sensor with name join-latency
DEBUG:kafka.metrics.metrics:Added sensor with name sync-latency
WARNING:kafka.coordinator.consumer:group_id is None: disabling auto-commit.
DEBUG:kafka.metrics.metrics:Added sensor with name commit-latency
INFO:kafka.consumer.subscription_state:Updating subscribed topics to: ('test-postgresql-jdbc-sample',)

@yuwtennis
Copy link
Author

yuwtennis commented Mar 12, 2022

y-watanabe@debian-primary:~/tmp/kafka_2.13-2.6.3$ bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092
debian-primary:9092 (id: 10 rack: null) -> (
        Produce(0): 0 to 8 [usable: 8],
        Fetch(1): 0 to 11 [usable: 11],
        ListOffsets(2): 0 to 5 [usable: 5],
        Metadata(3): 0 to 9 [usable: 9],
        LeaderAndIsr(4): 0 to 4 [usable: 4],
        StopReplica(5): 0 to 3 [usable: 3],
        UpdateMetadata(6): 0 to 6 [usable: 6],
        ControlledShutdown(7): 0 to 3 [usable: 3],
        OffsetCommit(8): 0 to 8 [usable: 8],
        OffsetFetch(9): 0 to 7 [usable: 7],
        FindCoordinator(10): 0 to 3 [usable: 3],
        JoinGroup(11): 0 to 7 [usable: 7],
        Heartbeat(12): 0 to 4 [usable: 4],
        LeaveGroup(13): 0 to 4 [usable: 4],
        SyncGroup(14): 0 to 5 [usable: 5],
        DescribeGroups(15): 0 to 5 [usable: 5],
        ListGroups(16): 0 to 4 [usable: 4],
        SaslHandshake(17): 0 to 1 [usable: 1],
        ApiVersions(18): 0 to 3 [usable: 3],
        CreateTopics(19): 0 to 5 [usable: 5],
        DeleteTopics(20): 0 to 4 [usable: 4],
        DeleteRecords(21): 0 to 2 [usable: 2],
        InitProducerId(22): 0 to 3 [usable: 3],
        OffsetForLeaderEpoch(23): 0 to 3 [usable: 3],
        AddPartitionsToTxn(24): 0 to 1 [usable: 1],
        AddOffsetsToTxn(25): 0 to 1 [usable: 1],
        EndTxn(26): 0 to 1 [usable: 1],
        WriteTxnMarkers(27): 0 [usable: 0],
        TxnOffsetCommit(28): 0 to 3 [usable: 3],
        DescribeAcls(29): 0 to 2 [usable: 2],
        CreateAcls(30): 0 to 2 [usable: 2],
        DeleteAcls(31): 0 to 2 [usable: 2],
        DescribeConfigs(32): 0 to 3 [usable: 3],
        AlterConfigs(33): 0 to 1 [usable: 1],
        AlterReplicaLogDirs(34): 0 to 1 [usable: 1],
        DescribeLogDirs(35): 0 to 2 [usable: 2],
        SaslAuthenticate(36): 0 to 2 [usable: 2],
        CreatePartitions(37): 0 to 2 [usable: 2],
        CreateDelegationToken(38): 0 to 2 [usable: 2],
        RenewDelegationToken(39): 0 to 2 [usable: 2],
        ExpireDelegationToken(40): 0 to 2 [usable: 2],
        DescribeDelegationToken(41): 0 to 2 [usable: 2],
        DeleteGroups(42): 0 to 2 [usable: 2],
        ElectLeaders(43): 0 to 2 [usable: 2],
        IncrementalAlterConfigs(44): 0 to 1 [usable: 1],
        AlterPartitionReassignments(45): 0 [usable: 0],
        ListPartitionReassignments(46): 0 [usable: 0],
        OffsetDelete(47): 0 [usable: 0],
        DescribeClientQuotas(48): 0 [usable: 0],
        AlterClientQuotas(49): 0 [usable: 0]
)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment