Skip to content

Instantly share code, notes, and snippets.

@windkit
Last active May 8, 2019
Embed
What would you like to do?
KAFKA-8334 Kafka Occasional OffsetCommit Timeout
from kafka import KafkaConsumer
from kafka import TopicPartition
from kafka.structs import OffsetAndMetadata
import time
from threading import Thread
PROCESS_TIME=5
TEST_TOPIC="test"
BOOTSTRAP=["test-kf001:20992"]
HEARTBEAT_INT=0.1
NUM_JAMMERS=20
consumer = KafkaConsumer('test', group_id='wilson-tester',
auto_offset_reset='earliest', enable_auto_commit=False, bootstrap_servers=BOOTSTRAP,
max_in_flight_requests_per_connection=NUM_JAMMERS*2)
print(consumer._coordinator)
first = True
running = True
def jam_heartbeat(coordinator):
global first
global running
while(running and first):
pass
while(running):
b4 = time.time()
coordinator._send_heartbeat_request()
after = time.time()
sleep_int = HEARTBEAT_INT - (after - b4)
if (sleep_int > 0):
time.sleep(HEARTBEAT_INT)
threads = []
for i in range(0, NUM_JAMMERS):
threads.append(Thread(target = jam_heartbeat, args = (consumer._coordinator,)))
threads[i].start()
try:
while True:
message = next(consumer)
first = False
time.sleep(PROCESS_TIME)
print(consumer.assignment())
finally:
running = False
for i in range(0, NUM_JAMMERS):
threads[i].join()
from kafka import KafkaConsumer
from kafka import TopicPartition
from kafka.structs import OffsetAndMetadata
import time
PROCESS_TIME=5
TEST_TOPIC="test"
BOOTSTRAP=["test-kf001:20992"]
consumer = KafkaConsumer('test', group_id='wilson-tester',
auto_offset_reset='earliest', enable_auto_commit=False, bootstrap_servers=BOOTSTRAP)
print(consumer._coordinator)
while True:
message = next(consumer)
time.sleep(PROCESS_TIME)
tp = TopicPartition(message.topic, message.partition)
start = time.time()
consumer.commit({ tp: OffsetAndMetadata(message.offset, None)})
end = time.time()
print(end - start)
print(consumer.assignment())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment