Skip to content

Instantly share code, notes, and snippets.

@garyrussell
Last active April 4, 2019 16:35
Show Gist options
  • Save garyrussell/d53d159418263d4303a62acbcda494ff to your computer and use it in GitHub Desktop.
Save garyrussell/d53d159418263d4303a62acbcda494ff to your computer and use it in GitHub Desktop.
so55510898
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.producer.transaction-id-prefix=tx-
spring.kafka.producer.retries=2
spring.kafka.producer.acks=all
spring.kafka.producer.properties.max.block.ms=5000
foo
2019-04-04 12:21:36.072 INFO 19363 --- [ad | producer-2] org.apache.kafka.clients.Metadata : Cluster ID: QSZvIlF_RRyVWJbrx3XsIQ
2019-04-04 12:21:36.088 WARN 19363 --- [ad | producer-2] o.a.k.clients.producer.internals.Sender : [Producer clientId=producer-2, transactionalId=tx-so55510898-1.so55510898-1.0] Got error produce response with correlation id 10 on topic-partition so55510898-2-0, retrying (1 attempts left). Error: NOT_ENOUGH_REPLICAS
2019-04-04 12:21:36.189 WARN 19363 --- [ad | producer-2] o.a.k.clients.producer.internals.Sender : [Producer clientId=producer-2, transactionalId=tx-so55510898-1.so55510898-1.0] Got error produce response with correlation id 11 on topic-partition so55510898-2-0, retrying (0 attempts left). Error: NOT_ENOUGH_REPLICAS
2019-04-04 12:21:36.298 ERROR 19363 --- [ad | producer-2] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='FOO' to topic so55510898-2:
org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required.
2019-04-04 12:21:36.302 ERROR 19363 --- [5510898-1-0-C-1] essageListenerContainer$ListenerConsumer : Transaction rolled back
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.example.So55510898Application.listen1(java.lang.String) throws java.lang.Exception' threw exception; nested exception is java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required.
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:302) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:79) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1224) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1217) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1178) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$1600(KafkaMessageListenerContainer.java:384) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$3.doInTransactionWithoutResult(KafkaMessageListenerContainer.java:1128) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.transaction.support.TransactionCallbackWithoutResult.doInTransaction(TransactionCallbackWithoutResult.java:36) ~[spring-tx-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:1118) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1096) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:934) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:750) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_181]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_181]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_181]
Caused by: java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required.
at java.util.concurrent.FutureTask.report(FutureTask.java:122) [na:1.8.0_181]
at java.util.concurrent.FutureTask.get(FutureTask.java:206) [na:1.8.0_181]
at org.springframework.util.concurrent.SettableListenableFuture.get(SettableListenableFuture.java:134) ~[spring-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at com.example.So55510898Application.listen1(So55510898Application.java:27) ~[classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_181]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_181]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_181]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_181]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:170) ~[spring-messaging-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
... 17 common frames omitted
Caused by: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required.
at org.springframework.kafka.core.KafkaTemplate.lambda$buildCallback$0(KafkaTemplate.java:396) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1235) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:635) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:604) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:561) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:485) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:700) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:532) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:524) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) ~[kafka-clients-2.0.1.jar:na]
... 1 common frames omitted
Caused by: org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required.
foo
2019-04-04 12:21:46.572 ERROR 19363 --- [ad | producer-2] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='FOO' to topic so55510898-2:
org.apache.kafka.common.KafkaException: Failing batch since transaction was aborted
at org.apache.kafka.clients.producer.internals.Sender.maybeSendTransactionalRequest(Sender.java:320) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:214) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) ~[kafka-clients-2.0.1.jar:na]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_181]
2019-04-04 12:26:21.800 INFO 19363 --- [ad | producer-1] org.apache.kafka.clients.Metadata : Cluster ID: QSZvIlF_RRyVWJbrx3XsIQ
"so55510898-1-0-C-1" #20 prio=5 os_prio=31 tid=0x00007fbf9d874800 nid=0x5d03 waiting on condition [0x000070001002d000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x000000076c1c52c0> (a java.util.concurrent.CountDownLatch$Sync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
at org.apache.kafka.clients.producer.internals.TransactionalRequestResult.await(TransactionalRequestResult.java:50)
at org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:681)
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.abortTransaction(DefaultKafkaProducerFactory.java:514)
at org.springframework.kafka.core.KafkaResourceHolder.rollback(KafkaResourceHolder.java:57)
at org.springframework.kafka.transaction.KafkaTransactionManager.doRollback(KafkaTransactionManager.java:182)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.processRollback(AbstractPlatformTransactionManager.java:838)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.rollback(AbstractPlatformTransactionManager.java:812)
at org.springframework.transaction.support.TransactionTemplate.rollbackOnException(TransactionTemplate.java:168)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:144)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:1118)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1096)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:934)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:750)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
...
[2019-04-04 12:33:40,140] INFO [Transaction Marker Request Completion Handler 0]: Sending tx-so55510898-1.so55510898-1.0's transaction marker for partition so55510898-2-0 has failed with error org.apache.kafka.common.errors.NotEnoughReplicasException, retrying with current coordinator epoch 0 (kafka.coordinator.transaction.TransactionMarkerRequestCompletionHandler)
[2019-04-04 12:33:40,140] ERROR [ReplicaManager broker=0] Error processing append operation on partition so55510898-2-0 (kafka.server.ReplicaManager)
org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 for partition so55510898-2-0
[2019-04-04 12:33:40,141] INFO [Transaction Marker Request Completion Handler 0]: Sending tx-so55510898-1.so55510898-1.0's transaction marker for partition so55510898-2-0 has failed with error org.apache.kafka.common.errors.NotEnoughReplicasException, retrying with current coordinator epoch 0 (kafka.coordinator.transaction.TransactionMarkerRequestCompletionHandler)
[2019-04-04 12:33:40,141] ERROR [ReplicaManager broker=0] Error processing append operation on partition so55510898-2-0 (kafka.server.ReplicaManager)
org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 for partition so55510898-2-0
[2019-04-04 12:33:40,141] INFO [Transaction Marker Request Completion Handler 0]: Sending tx-so55510898-1.so55510898-1.0's transaction marker for partition so55510898-2-0 has failed with error org.apache.kafka.common.errors.NotEnoughReplicasException, retrying with current coordinator epoch 0 (kafka.coordinator.transaction.TransactionMarkerRequestCompletionHandler)
[2019-04-04 12:33:40,141] ERROR [ReplicaManager broker=0] Error processing append operation on partition so55510898-2-0 (kafka.server.ReplicaManager)
org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 for partition so55510898-2-0
[2019-04-04 12:33:40,141] INFO [KafkaServer id=0] Controlled shutdown succeeded (kafka.server.KafkaServer)
[2019-04-04 12:33:40,141] INFO [Transaction Marker Request Completion Handler 0]: Sending tx-so55510898-1.so55510898-1.0's transaction marker for partition so55510898-2-0 has failed with error org.apache.kafka.common.errors.NotEnoughReplicasException, retrying with current coordinator epoch 0 (kafka.coordinator.transaction.TransactionMarkerRequestCompletionHandler)
[2019-04-04 12:33:40,142] ERROR [ReplicaManager broker=0] Error processing append operation on partition so55510898-2-0 (kafka.server.ReplicaManager)
org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 for partition so55510898-2-0
...
package com.example;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
@SpringBootApplication
public class So55510898Application {
public static void main(String[] args) {
SpringApplication.run(So55510898Application.class, args);
}
@Autowired
private KafkaTemplate<String, String> template;
@KafkaListener(id = "so55510898-1", topics = "so55510898-1")
public void listen1(String in) throws Exception {
System.out.println(in);
ListenableFuture<SendResult<String, String>> fut = template.send("so55510898-2", in.toUpperCase());
System.out.println(fut.get(10, TimeUnit.SECONDS));
}
@KafkaListener(id = "so55510898-2", topics = "so55510898-2")
public void listen2(String in) {
System.out.println(in);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment