Created
November 14, 2022 19:04
-
-
Save Mekk/ed05c9fc95196bfb4aac5629375295e0 to your computer and use it in GitHub Desktop.
patch.diff
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c | |
index 8b7c1d8f..5977cb1c 100644 | |
--- a/src/rdkafka_broker.c | |
+++ b/src/rdkafka_broker.c | |
@@ -5223,6 +5223,19 @@ rd_kafka_broker_addresses_exhausted(const rd_kafka_broker_t *rkb) { | |
} | |
+/** | |
+ * Time limit to finalize destruction (TODO: should be configurable) | |
+ * | |
+ * This tries to protect against process being infinitely stuck in destruction | |
+ * (see #3143) if some refcnt is not released (then loop below just keeps | |
+ * running and other threads keep waiting). Instead we finally break after | |
+ * this time, leaving possible leaking refs to be analyzed afterwards. | |
+ * | |
+ * Technically we set termination_deadline below once we note rdkafka is | |
+ * terminating and break loop if it is exceeded. | |
+ **/ | |
+static const int TERMINATION_TIMEOUT_MS = 60 * 1000; | |
+ | |
static int rd_kafka_broker_thread_main(void *arg) { | |
rd_kafka_broker_t *rkb = arg; | |
rd_kafka_t *rk = rkb->rkb_rk; | |
@@ -5245,11 +5258,34 @@ static int rd_kafka_broker_thread_main(void *arg) { | |
rd_rkb_dbg(rkb, BROKER, "BRKMAIN", "Enter main broker thread"); | |
+ /* Protection against infinite wait in case of lost refcnts, | |
+ see comment for TERMINATION_TIMEOUT_MS above */ | |
+ rd_ts_t termination_deadline = 0; | |
+ | |
while (!rd_kafka_broker_terminating(rkb)) { | |
int backoff; | |
int r; | |
rd_kafka_broker_state_t orig_state; | |
+ if(termination_deadline) { | |
+ if(rd_timeout_remains(termination_deadline) == RD_POLL_NOWAIT) { | |
+ rd_rkb_log(rkb, LOG_WARNING, "TERMINATE", | |
+ "Broker exceeded termination deadline, interrupting.\n" | |
+ "Possibly some references were leaked.\n" | |
+ "Handle is in state %s: " | |
+ "%d refcnts (%p), %d toppar(s), " | |
+ "%d active toppar(s), " | |
+ "%d outbufs, %d waitresps, %d retrybufs", | |
+ rd_kafka_broker_state_names[rkb->rkb_state], | |
+ rd_refcnt_get(&rkb->rkb_refcnt), &rkb->rkb_refcnt, | |
+ rkb->rkb_toppar_cnt, rkb->rkb_active_toppar_cnt, | |
+ (int)rd_kafka_bufq_cnt(&rkb->rkb_outbufs), | |
+ (int)rd_kafka_bufq_cnt(&rkb->rkb_waitresps), | |
+ (int)rd_kafka_bufq_cnt(&rkb->rkb_retrybufs)); | |
+ break; | |
+ } | |
+ } | |
+ | |
redo: | |
orig_state = rkb->rkb_state; | |
@@ -5390,6 +5426,10 @@ static int rd_kafka_broker_thread_main(void *arg) { | |
} | |
if (rd_kafka_terminating(rkb->rkb_rk)) { | |
+ if(! termination_deadline) { | |
+ termination_deadline = rd_timeout_init(TERMINATION_TIMEOUT_MS); | |
+ } | |
+ | |
/* Handle is terminating: fail the send+retry queue | |
* to speed up termination, otherwise we'll | |
* need to wait for request timeouts. */ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment