File: server.properties
broker.id=1
listeners=EXTERNAL://:9093,INTERNAL://:9092
advertised.listeners=INTERNAL://172.17.20.117:9092,EXTERNAL://52.80.79.248:9093
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
inter.broker.listener.name=INTERNAL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafkalogs
num.partitions=4
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=172.17.21.246:2181,172.17.20.117:2181,172.17.20.255:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
File: client.py
#!/usr/bin/env python3
from kafka import KafkaConsumer
from kafka.client_async import KafkaClient, selectors
from kafka.metrics import MetricConfig, Metrics
from kafka.cluster import ClusterMetadata
import sys
import time
import socket
#BOOTSTART_SERVERS = 'kafka-ext-809d576b1603e92b.elb.cn-north-1.amazonaws.com.cn:9093'
BOOTSTART_SERVERS = 'kafka-lb-internal-ac1ce140e307bd3f.elb.cn-north-1.amazonaws.com.cn'
topic = 'test'
# topic = 'colourlife_transaction'
#adminuser = 'colourlifeadmin'
#adminpass = '6hGawfdXnnj2'
#consumeruser = 'consumer'
#consumerpass = 'consumer-0tshrST6'
#produceruser = 'producer'
#producerpass = 'producer-75tggKRm'
now = str(time.time()).encode('utf-8')
#consumer = KafkaConsumer(topic, bootstrap_servers=BOOTSTART_SERVERS, enable_auto_commit=True)
#consumer = KafkaConsumer(topic, bootstrap_servers=BOOTSTART_SERVERS, security_protocol="SASL_PLAINTEXT",
# sasl_mechanism='PLAIN',
# sasl_plain_username=consumeruser,
# sasl_plain_password=consumerpass,
# group_id='my_favorite_group', enable_auto_commit=True)
'''
consumer.subscribe([topic])
print(consumer.subscription())
try:
for message in consumer:
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key, message.value))
#print('assignment: ', consumer.assignment())
#print('partitions', consumer.partitions_for_topic(topic))
print('message: ', message)
print('\n\n')
except KeyboardInterrupt:
consumer.close()
sys.exit(0)
consumer.close()
'''
DEFAULT_CONFIG = {
'bootstrap_servers': BOOTSTART_SERVERS,
'client_id': 'kafka-python-test',
'group_id': None,
'key_deserializer': None,
'value_deserializer': None,
'fetch_max_wait_ms': 500,
'fetch_min_bytes': 1,
'fetch_max_bytes': 52428800,
'max_partition_fetch_bytes': 1 * 1024 * 1024,
'request_timeout_ms': 305000, # chosen to be higher than the default of max_poll_interval_ms
'retry_backoff_ms': 100,
'reconnect_backoff_ms': 50,
'reconnect_backoff_max_ms': 1000,
'max_in_flight_requests_per_connection': 5,
'auto_offset_reset': 'latest',
'enable_auto_commit': True,
'auto_commit_interval_ms': 5000,
'default_offset_commit_callback': lambda offsets, response: True,
'check_crcs': True,
'metadata_max_age_ms': 5 * 60 * 1000,
# 'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor),
'max_poll_records': 500,
'max_poll_interval_ms': 300000,
'session_timeout_ms': 10000,
'heartbeat_interval_ms': 3000,
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
'sock_chunk_bytes': 4096, # undocumented experimental option
'sock_chunk_buffer_count': 1000, # undocumented experimental option
'consumer_timeout_ms': float('inf'),
'skip_double_compressed_messages': False,
'security_protocol': 'PLAINTEXT',
'ssl_context': None,
'ssl_check_hostname': True,
'ssl_cafile': None,
'ssl_certfile': None,
'ssl_keyfile': None,
'ssl_crlfile': None,
'ssl_password': None,
'api_version': None,
'api_version_auto_timeout_ms': 2000,
'connections_max_idle_ms': 9 * 60 * 1000,
'metric_reporters': [],
'metrics_num_samples': 2,
'metrics_sample_window_ms': 30000,
'metric_group_prefix': 'consumer',
'selector': selectors.DefaultSelector,
'exclude_internal_topics': True,
'sasl_mechanism': None,
'sasl_plain_username': None,
'sasl_plain_password': None,
'sasl_kerberos_service_name': 'kafka',
'sasl_kerberos_domain_name': None
}
DEFAULT_SESSION_TIMEOUT_MS_0_9 = 30000
metrics_tags = {'client-id': DEFAULT_CONFIG['client_id']}
metric_config = MetricConfig(samples=DEFAULT_CONFIG['metrics_num_samples'],
time_window_ms=DEFAULT_CONFIG['metrics_sample_window_ms'],
tags=metrics_tags)
reporters = [reporter() for reporter in DEFAULT_CONFIG['metric_reporters']]
metrics = Metrics(metric_config, reporters)
client = KafkaClient(metrics=metrics, **DEFAULT_CONFIG)
print(client.cluster)
print(client.cluster.brokers())
'''
cluster = ClusterMetadata(**DEFAULT_CONFIG)
bootstrap = BrokerConnection(host, port, afi,
state_change_callback=cb,
node_id='bootstrap',
**self.config)
metadata_request = MetadataRequest[1](None)
future = bootstrap.send(metadata_request)
cluster.update_metadata(future.value)
'''
File: producer.py
#!/usr/bin/env python3
from kafka import KafkaProducer
from kafka.errors import KafkaError
import time
BOOTSTART_SERVERS = 'kafka-lb-internal-ac1ce140e307bd3f.elb.cn-north-1.amazonaws.com.cn'
topic = 'test'
#adminuser = 'colourlifeadmin'
#adminpass = '6hGawfdXnnj2'
#consumeruser = 'consumer'
#consumerpass = 'consumer-0tshrST6'
#produceruser = 'producer'
#producerpass = 'producer-75tggKRm'
producer = KafkaProducer(bootstrap_servers=BOOTSTART_SERVERS, api_version='auto')
#producer = KafkaProducer(bootstrap_servers=BOOTSTART_SERVERS, api_version=(0,10),
# security_protocol="SASL_PLAINTEXT",
# sasl_mechanism='PLAIN',
# sasl_plain_username=produceruser,
# sasl_plain_password=producerpass)
for i in range(30):
now = str(time.time()).encode('utf-8')
print('data: ', now)
future = producer.send(topic, now)
record_metadata = future.get(timeout=10)
print(record_metadata)
time.sleep(3)
i += 1
print('\n')
#try:
# record_metadata = future.get(timeout=10)
#except KafkaError:
# # log.exception()
# print('err')
# pass
producer.close()