Skip to content

Instantly share code, notes, and snippets.

@Mekk
Created November 14, 2022 19:04
Show Gist options
  • Save Mekk/ed05c9fc95196bfb4aac5629375295e0 to your computer and use it in GitHub Desktop.
Save Mekk/ed05c9fc95196bfb4aac5629375295e0 to your computer and use it in GitHub Desktop.
patch.diff
diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c
index 8b7c1d8f..5977cb1c 100644
--- a/src/rdkafka_broker.c
+++ b/src/rdkafka_broker.c
@@ -5223,6 +5223,19 @@ rd_kafka_broker_addresses_exhausted(const rd_kafka_broker_t *rkb) {
}
+/**
+ * Time limit to finalize destruction (TODO: should be configurable)
+ *
+ * This tries to protect against process being infinitely stuck in destruction
+ * (see #3143) if some refcnt is not released (then loop below just keeps
+ * running and other threads keep waiting). Instead we finally break after
+ * this time, leaving possible leaking refs to be analyzed afterwards.
+ *
+ * Technically we set termination_deadline below once we note rdkafka is
+ * terminating and break loop if it is exceeded.
+ **/
+static const int TERMINATION_TIMEOUT_MS = 60 * 1000;
+
static int rd_kafka_broker_thread_main(void *arg) {
rd_kafka_broker_t *rkb = arg;
rd_kafka_t *rk = rkb->rkb_rk;
@@ -5245,11 +5258,34 @@ static int rd_kafka_broker_thread_main(void *arg) {
rd_rkb_dbg(rkb, BROKER, "BRKMAIN", "Enter main broker thread");
+ /* Protection against infinite wait in case of lost refcnts,
+ see comment for TERMINATION_TIMEOUT_MS above */
+ rd_ts_t termination_deadline = 0;
+
while (!rd_kafka_broker_terminating(rkb)) {
int backoff;
int r;
rd_kafka_broker_state_t orig_state;
+ if(termination_deadline) {
+ if(rd_timeout_remains(termination_deadline) == RD_POLL_NOWAIT) {
+ rd_rkb_log(rkb, LOG_WARNING, "TERMINATE",
+ "Broker exceeded termination deadline, interrupting.\n"
+ "Possibly some references were leaked.\n"
+ "Handle is in state %s: "
+ "%d refcnts (%p), %d toppar(s), "
+ "%d active toppar(s), "
+ "%d outbufs, %d waitresps, %d retrybufs",
+ rd_kafka_broker_state_names[rkb->rkb_state],
+ rd_refcnt_get(&rkb->rkb_refcnt), &rkb->rkb_refcnt,
+ rkb->rkb_toppar_cnt, rkb->rkb_active_toppar_cnt,
+ (int)rd_kafka_bufq_cnt(&rkb->rkb_outbufs),
+ (int)rd_kafka_bufq_cnt(&rkb->rkb_waitresps),
+ (int)rd_kafka_bufq_cnt(&rkb->rkb_retrybufs));
+ break;
+ }
+ }
+
redo:
orig_state = rkb->rkb_state;
@@ -5390,6 +5426,10 @@ static int rd_kafka_broker_thread_main(void *arg) {
}
if (rd_kafka_terminating(rkb->rkb_rk)) {
+ if(! termination_deadline) {
+ termination_deadline = rd_timeout_init(TERMINATION_TIMEOUT_MS);
+ }
+
/* Handle is terminating: fail the send+retry queue
* to speed up termination, otherwise we'll
* need to wait for request timeouts. */
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment