Skip to content

Instantly share code, notes, and snippets.

@anuchandy
Last active February 1, 2024 22:28
Show Gist options
  • Save anuchandy/35096f680d1979c4773745db992163ec to your computer and use it in GitHub Desktop.
Save anuchandy/35096f680d1979c4773745db992163ec to your computer and use it in GitHub Desktop.
The Session Idle Timeout

The Session Idle Timeout

The session idle timeout is a client-side feature. It enables applications to transparently roll to a new session when no message arrives in the current session for a period.

Current implementation:

In the current implementation, the idle timer starts as soon as a message is emitted downstream. The timer is canceled if the next message arrives within the timeout. If no messages arrive within the timeout, the timeout is triggered to close the session.

A simplified flow looks like below [where Stage* is a stage in the asynchronous pipeline] :

  • Stage0 - [Thread-0] On message arrival, starts timer and enqueue message to Stage1.
  • Stage1 - [Thread-0] dequeue message, schedule application callback on Thread-1 and return.
  • Stage2 - [Thread-1] eventually runs the application callback with business logic.

In this approach, If the timer expires while the application callback is in progress, the library may close the session and interrupt the application processing running on Thread-1.

Due to this interruption experience - Users need to figure out an idle timeout value greater than their business processing time. Estimating their business processing time is not easy, often user's processing calls into external IO (e.g., DB, Rest endpoint) . Azure library causing those IO calls to fail is not a great experience.

(this is the experience in Session Processor API based on both v1 and v2 stack.)

Proposal:

The idle timeout should be measured since the completion of the last operation on the session.

So,

  1. In case of serial message processing, the timer start ticking after the completion of the current message processing.
  2. In the case of concurrent message processing, the timer start ticking after the completion of all concurrent message processing.
  3. There must not be multiple timers running concurrently, or timer should not be running if there an active message processing.

Proposing a type SessionIdleTimer, which statisifies above requirements and used in the Processor as -

SessionIdleTimer timer = …

sessionMessages()
  .concurrency(10)
  .handle(message -> {
     timer.onBeginRun()
     invoke userCallback()
     timer.onEndRun()
  });

SessionIdleTimer.java

implementation
package com.azure.messaging.servicebus;

import com.azure.core.util.logging.ClientLogger;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

final class SessionIdleTimer {
    private final ClientLogger logger;
    private final Duration timeout;
    private final int maxRunning;
    private final OneShotTimer.OnTimeout onTimeout;
    private final AtomicInteger running = new AtomicInteger(-1);
    private final AtomicReference<OneShotTimer> timer = new AtomicReference<>(OneShotTimer.INIT);

    SessionIdleTimer(ClientLogger logger, Duration timeout, int maxRunning) {
        this.logger = Objects.requireNonNull(logger);
        this.timeout = Objects.requireNonNull(timeout);
        this.maxRunning = maxRunning;
        this.onTimeout = new OneShotTimer.OnTimeout();
    }

    /**
     * Gets the mono that terminates with completion signal once the idle timeout expires.
     * <p>
     * The timer start for the first time when the mono is subscribed.
     * </p>
     *
     * @return the timeout mono.
     */
    Mono<Void> timeout() {
        return Mono.<Void>fromRunnable(() -> {
            final OneShotTimer first = new OneShotTimer(onTimeout, timeout);
            if (!timer.compareAndSet(OneShotTimer.INIT, first)) {
                throw logger.atError().log(new IllegalStateException("Unexpected multi-subscription to timeout mono."));
            }
            running.set(0);
            first.start();
        })
        .then(onTimeout.mono())
        .doOnEach(s -> {
            disposed();
        })
        .doOnCancel(() -> {
            disposed();
        });
    }

    /**
     * Signals that the calling thread started running the message consumer to process a session message,
     * which indicate that the session is not idle and if an idle timer active then it should be canceled.
     */
    void onBeginRun() {
        final int r = running.incrementAndGet();
        assert r > 0 && r <= maxRunning;
        if (r == 1) {
            final OneShotTimer current = timer.get();
            current.cancel();
        }
    }


    /**
     * Signals that the calling thread is finishing a message consumer run. The idle timer starts ticking only if there
     * are no other threads currently running message consumers.
     */
    void onEndRun() {
        final int r = running.decrementAndGet();
        assert r >= 0 && r < maxRunning;
        // Check 'running' "before" CAS-loop. Calling thread T1 observing 'running==0' means there are no other threads
        // running message consumers.
        if (r == 0) {
            final OneShotTimer next = new OneShotTimer(onTimeout, timeout);
            // Enter CAS-loop to set idle timer, or no-op if the SessionIdleTimer is disposed.
            for ( ; ; ) {
                // The chances of thread contention happening in CAS-loop are very low. For it to happen, more than one
                // thread should observe 'running' as 0. Which can happen when –
                // 1.  Thread T1 invoked onEndRun after message consumption, decremented 'running' to 0, enter CAS-loop
                //     and got parked by JVM/OS.
                // 2.  While T1 is parked, thread T2 ran onBeginRun bumping 'running' to 1, ran its message consumer,
                //     invoked onEndRun to decrement 'running' to 0 and entered CAS-Loop.
                // 3.  At this point T1 resumed and continued concurrently with T2.
                //
                // T1 is not doing any IO to make it eligible for park, and the chances of T2 beating T1 by finishing its
                // consumer run is low. This means chances for thread contention in CAS-loop is almost nil.
                //
                final OneShotTimer current = timer.get();
                if (current == OneShotTimer.DISPOSED) {
                    return;
                }
                if (timer.compareAndSet(current, next)) {
                    current.cancel();
                    break;
                }
            }

            // Check 'running' "after" CAS-loop. If it is still 0, then attempt to start the 'next' timer.
            if (running.get() == 0) {
                // While this thread T1 that observed 'running' as 0 ("before" entering CAS-loop) was executing CAS-loop,
                // it is possible that another thread T2 executed onBeginRun and incremented 'running'. T2 could
                // miss canceling the 'next' that T1 set to 'timer'. In this case, given T2 brought a new message consumer,
                // we don’t want T1 to start the 'next' it set. T1 should start 'next' only if the latest value of
                // 'running' is still 0.
                // (Alternatively, if T2 cancels 'timer' after T1 sets 'next' to 'timer', then T1's attempt to start
                // the 'next' here is no-op)
                next.start();
            }
        }
    }

    /**
     * Mark the SessionIdleTimer as disposed.
     */
    private void disposed() {
        final OneShotTimer current = timer.getAndSet(OneShotTimer.DISPOSED);
        current.cancel();
    }

    /**
     * A timer that run at most once which signals {@link OnTimeout} upon timeout.
     */
    private static final class OneShotTimer extends AtomicBoolean {
        private final Disposable.Composite disposable = Disposables.composite();
        private final OnTimeout onTimeout;
        private final Duration timeout;
        static final OneShotTimer INIT;
        static final OneShotTimer DISPOSED;
        static {
            INIT = new OneShotTimer(OnTimeout.NONE, Duration.ZERO);
            DISPOSED = new OneShotTimer(OnTimeout.NONE, Duration.ZERO);
            INIT.cancel();
            DISPOSED.cancel();
        }

        OneShotTimer(OnTimeout onTimeout, Duration timeout) {
            super(false);
            this.onTimeout = onTimeout;
            this.timeout = timeout;
        }

        /**
         * Start the timer that when timeout, signals {@link OnTimeout}, unless not canceled via {@link this#cancel()}.
         */
        void start() {
            if (disposable.isDisposed() || super.getAndSet(true)) {
                // already canceled or started.
                return;
            }
            final Disposable d = Mono.delay(timeout)
                .subscribe(__ -> onTimeout.signalTimeout(), onTimeout::signalError, onTimeout::signalTimeout);
            disposable.add(d);
        }

        /**
         * Cancel the timer. Once canceled the timer cannot be started again.
         */
        void cancel() {
            disposable.dispose();
        }

        /**
         * A type that {@link OneShotTimer} signals upon timeout.
         */
        static final class OnTimeout {
            private final Sinks.Empty<Void> sink = Sinks.empty();
            private final AtomicBoolean signaled = new AtomicBoolean(false);
            static final OnTimeout NONE = new OnTimeout();
            static {
                NONE.signalError(new IllegalStateException("OnTimeout.NONE should not be subscribed."));
            }

            /**
             * Get the {@link Mono} that terminates with completion when the {@link OneShotTimer} signals timeout.
             *
             * @return the timeout mono.
             */
            Mono<Void> mono() {
                return sink.asMono();
            }

            /**
             * Invoked by {@link OneShotTimer} upon timeout.
             */
            void signalTimeout() {
                if (signaled.getAndSet(true)) {
                    return;
                }
                sink.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST);
            }

            /**
             * Invoked by {@link OneShotTimer} upon error.
             */
            void signalError(Throwable error) {
                if (signaled.getAndSet(true)) {
                    return;
                }
                sink.emitError(error, Sinks.EmitFailureHandler.FAIL_FAST);
            }
        }
    }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment