Skip to content

Instantly share code, notes, and snippets.

@peihsinsu
Created December 3, 2018 08:38
Show Gist options
  • Save peihsinsu/f83de117420f3c1af80669ae95ed7ae7 to your computer and use it in GitHub Desktop.
Save peihsinsu/f83de117420f3c1af80669ae95ed7ae7 to your computer and use it in GitHub Desktop.
Kafka Test with Load Balancer

File: server.properties

broker.id=1
listeners=EXTERNAL://:9093,INTERNAL://:9092
advertised.listeners=INTERNAL://172.17.20.117:9092,EXTERNAL://52.80.79.248:9093
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
inter.broker.listener.name=INTERNAL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafkalogs
num.partitions=4
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=172.17.21.246:2181,172.17.20.117:2181,172.17.20.255:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

File: client.py

#!/usr/bin/env python3

from kafka import KafkaConsumer
from kafka.client_async import KafkaClient, selectors
from kafka.metrics import MetricConfig, Metrics
from kafka.cluster import ClusterMetadata

import sys
import time
import socket



#BOOTSTART_SERVERS = 'kafka-ext-809d576b1603e92b.elb.cn-north-1.amazonaws.com.cn:9093'
BOOTSTART_SERVERS = 'kafka-lb-internal-ac1ce140e307bd3f.elb.cn-north-1.amazonaws.com.cn'
topic = 'test'
# topic = 'colourlife_transaction'
#adminuser = 'colourlifeadmin'
#adminpass = '6hGawfdXnnj2'
#consumeruser = 'consumer'
#consumerpass = 'consumer-0tshrST6'
#produceruser = 'producer'
#producerpass = 'producer-75tggKRm'
now = str(time.time()).encode('utf-8')

#consumer = KafkaConsumer(topic, bootstrap_servers=BOOTSTART_SERVERS, enable_auto_commit=True)
#consumer = KafkaConsumer(topic, bootstrap_servers=BOOTSTART_SERVERS, security_protocol="SASL_PLAINTEXT",
#                         sasl_mechanism='PLAIN',
#                         sasl_plain_username=consumeruser,
#                         sasl_plain_password=consumerpass,
#                         group_id='my_favorite_group', enable_auto_commit=True)
'''
consumer.subscribe([topic])
print(consumer.subscription())

try:
    for message in consumer:
        print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                             message.offset, message.key, message.value))
        #print('assignment: ', consumer.assignment())
        #print('partitions', consumer.partitions_for_topic(topic))
        print('message: ', message)
        print('\n\n')
except KeyboardInterrupt:
    consumer.close()
    sys.exit(0)

consumer.close()
'''
DEFAULT_CONFIG = {
        'bootstrap_servers': BOOTSTART_SERVERS,
        'client_id': 'kafka-python-test',
        'group_id': None,
        'key_deserializer': None,
        'value_deserializer': None,
        'fetch_max_wait_ms': 500,
        'fetch_min_bytes': 1,
        'fetch_max_bytes': 52428800,
        'max_partition_fetch_bytes': 1 * 1024 * 1024,
        'request_timeout_ms': 305000, # chosen to be higher than the default of max_poll_interval_ms
        'retry_backoff_ms': 100,
        'reconnect_backoff_ms': 50,
        'reconnect_backoff_max_ms': 1000,
        'max_in_flight_requests_per_connection': 5,
        'auto_offset_reset': 'latest',
        'enable_auto_commit': True,
        'auto_commit_interval_ms': 5000,
        'default_offset_commit_callback': lambda offsets, response: True,
        'check_crcs': True,
        'metadata_max_age_ms': 5 * 60 * 1000,
#        'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor),
        'max_poll_records': 500,
        'max_poll_interval_ms': 300000,
        'session_timeout_ms': 10000,
        'heartbeat_interval_ms': 3000,
        'receive_buffer_bytes': None,
        'send_buffer_bytes': None,
        'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
        'sock_chunk_bytes': 4096,  # undocumented experimental option
        'sock_chunk_buffer_count': 1000,  # undocumented experimental option
        'consumer_timeout_ms': float('inf'),
        'skip_double_compressed_messages': False,
        'security_protocol': 'PLAINTEXT',
        'ssl_context': None,
        'ssl_check_hostname': True,
        'ssl_cafile': None,
        'ssl_certfile': None,
        'ssl_keyfile': None,
        'ssl_crlfile': None,
        'ssl_password': None,
        'api_version': None,
        'api_version_auto_timeout_ms': 2000,
        'connections_max_idle_ms': 9 * 60 * 1000,
        'metric_reporters': [],
        'metrics_num_samples': 2,
        'metrics_sample_window_ms': 30000,
        'metric_group_prefix': 'consumer',
        'selector': selectors.DefaultSelector,
        'exclude_internal_topics': True,
        'sasl_mechanism': None,
        'sasl_plain_username': None,
        'sasl_plain_password': None,
        'sasl_kerberos_service_name': 'kafka',
        'sasl_kerberos_domain_name': None
    }
DEFAULT_SESSION_TIMEOUT_MS_0_9 = 30000

metrics_tags = {'client-id': DEFAULT_CONFIG['client_id']}
metric_config = MetricConfig(samples=DEFAULT_CONFIG['metrics_num_samples'],
                                     time_window_ms=DEFAULT_CONFIG['metrics_sample_window_ms'],
                                     tags=metrics_tags)
reporters = [reporter() for reporter in DEFAULT_CONFIG['metric_reporters']]
metrics = Metrics(metric_config, reporters)
client = KafkaClient(metrics=metrics, **DEFAULT_CONFIG)
print(client.cluster)
print(client.cluster.brokers())

'''
cluster = ClusterMetadata(**DEFAULT_CONFIG)

bootstrap = BrokerConnection(host, port, afi,
                                         state_change_callback=cb,
                                         node_id='bootstrap',
                                         **self.config)

metadata_request = MetadataRequest[1](None)
future = bootstrap.send(metadata_request)
cluster.update_metadata(future.value)
'''

File: producer.py

#!/usr/bin/env python3


from kafka import KafkaProducer
from kafka.errors import KafkaError
import time


BOOTSTART_SERVERS = 'kafka-lb-internal-ac1ce140e307bd3f.elb.cn-north-1.amazonaws.com.cn'
topic = 'test'
#adminuser = 'colourlifeadmin'
#adminpass = '6hGawfdXnnj2'
#consumeruser = 'consumer'
#consumerpass = 'consumer-0tshrST6'
#produceruser = 'producer'
#producerpass = 'producer-75tggKRm'

producer = KafkaProducer(bootstrap_servers=BOOTSTART_SERVERS, api_version='auto')
#producer = KafkaProducer(bootstrap_servers=BOOTSTART_SERVERS, api_version=(0,10),
#                         security_protocol="SASL_PLAINTEXT",
#                         sasl_mechanism='PLAIN',
#                         sasl_plain_username=produceruser,
#                         sasl_plain_password=producerpass)
for i in range(30):
    now = str(time.time()).encode('utf-8')
    print('data: ', now)
    future = producer.send(topic, now)
    record_metadata = future.get(timeout=10)
    print(record_metadata)
    time.sleep(3)
    i += 1
    print('\n')
#try:
#    record_metadata = future.get(timeout=10)
#except KafkaError:
#    # log.exception()
#    print('err')
#    pass


producer.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment