Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
General purpose thread-safe event class.

General purpose thread-safe event class.

Features:

  • Thread-safety.
  • Supports capturing the SyncronizationContext (Which makes it useable in the Unity3d engine).
  • Provides a Task<T> api for waiting for events.
  • Provides a callback (Subscribe / Unsubscribe) api for receiving events.
  • Reasonable performance.
  • Ability to pass a custom subscriptionToken makes it easier to unsubscribe lambda's.
  • Supports storing data that was invoked when no-one was subscribed (or waiting) yet.

Allocations:

  • Invoke is allocation free (after it has warmed up some caches).
  • Waiting using the Task<T> api allocates a handler on the heap.
  • New subscriptions allocates a handler on the heap.
  • Unsubscriptions are allocation free (Depending on 'Equals' implementation on your provided subscriptionToken).

Unity3d note: If you call 'Subscribe' from the unity-thread (and leave callOnCapturedContext at true) then your callback will only be invoked on the unity-thread making it very safe to use.

Example:

class Program
{
    static async Task Main()
    {
        var worker = new Worker();

        // Async style:
        var result = await worker.WorkReady.WaitAsync();
        Console.WriteLine($"Got async: '{result}'");

        // Callback style:
        worker.WorkReady.Subscribe(WorkReady);
        Console.ReadKey();
        worker.WorkReady.Unsubscribe(WorkReady);
    }

    static void WorkReady(int output) => Console.WriteLine($"Got from subscription: '{output}'");

    class Worker : IExceptionHandler
    {
        readonly SynchronizedEvent<int> outputEvent;
        readonly Task backgroundWork;

        public Worker()
        {
            this.outputEvent = new SynchronizedEvent<int>(
                exceptionHandler: this,
                storeUnobservedData: true);
            this.backgroundWork = Task.Run(BackgroundWorkAsync);
        }

        public IReadOnlySynchronizedEvent<int> WorkReady => this.outputEvent;

        void IExceptionHandler.Handle(Exception e) => Console.Error.Write(e);

        async Task BackgroundWorkAsync()
        {
            while (true)
            {
                await Task.Delay(1000);
                this.outputEvent.Invoke(42);
            }
        }
    }
}

SynchronizedEvent<T>:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

/// <summary>
/// Interface for exception handlers.
/// </summary>
public interface IExceptionHandler
{
    /// <summary>
    /// Handle given exception.
    /// </summary>
    /// <remarks>Implementation should not rethrow in this call-stack.</remarks>
    /// <param name="exception">Exception to handle.</param>
    void Handle(Exception exception);
}

/// <summary>
/// Thread-safe event that supports task and callback style listening.
/// </summary>
/// <typeparam name="T">Type of the event data.</typeparam>
public interface IReadOnlySynchronizedEvent<T>
{
    /// <summary>
    /// Wait for an event to be fired.
    /// </summary>
    /// <param name="cancelToken">Token to be able to cancel the task.</param>
    /// <returns>Task that completes when an event is received or is cancelled.</returns>
    Task<T> WaitAsync(CancellationToken cancelToken = default);

    /// <summary>
    /// Subscribe to events.
    /// </summary>
    /// <param name="action">Action to invoke when an event is fired.</param>
    /// <param name="subscriptionToken">
    /// Token to use for unsubscribing, if none is provided then the 'action' will be used.
    /// </param>
    /// <param name="callOnCapturedContext">
    /// Should the action only be called on the SynchronizationContext that was active when
    /// subscribing.
    /// </param>
    void Subscribe(Action<T> action, object subscriptionToken = null, bool callOnCapturedContext = true);

    /// <summary>
    /// Subscribe to events.
    /// </summary>
    /// <remarks>
    /// State parameter can be used to avoid having to use a closure to capture state.
    /// </remarks>
    /// <param name="action">Action to invoke when an event is fired.</param>
    /// <param name="state">State that is passed to the action.</param>
    /// <param name="subscriptionToken">
    /// Token to use for unsubscribing, if none is provided then the 'action' will be used.
    /// </param>
    /// <param name="callOnCapturedContext">
    /// Should the action only be called on the SynchronizationContext that was active when
    /// subscribing.
    /// </param>
    void Subscribe(
        Action<T, object> action,
        object state,
        object subscriptionToken = null,
        bool callOnCapturedContext = true);

    /// <summary>
    /// Unsubscribe from events.
    /// </summary>
    /// <param name="action">Action that was used as the subscriptionToken.</param>
    /// <returns>True if successfully unsubscribed otherwise False.</returns>
    bool Unsubscribe(Action<T> action);

    /// <summary>
    /// Unsubscribe from events.
    /// </summary>
    /// <param name="subscriptionToken">Token that was used for subscribing.</param>
    /// <returns>True if successfully unsubscribed otherwise False.</returns>
    bool Unsubscribe(object subscriptionToken);
}

/// <summary>
/// Thread-safe event that supports task and callback style listening.
///
/// Thread safety: Api should be threadsafe.
/// </summary>
/// <typeparam name="T">Type of the event data.</typeparam>
public sealed class SynchronizedEvent<T> : IReadOnlySynchronizedEvent<T>, IDisposable
{
    [ThreadStatic] private static List<AwaiterHandle> awaiterInvokeList;
    [ThreadStatic] private static List<SubscribeHandle> subInvokeList;

    private readonly IExceptionHandler exceptionHandler;
    private readonly bool storeUnobservedData;
    private readonly bool allowSynchronousContinuations;

    private readonly object awaitersLock = new object();
    private readonly List<AwaiterHandle> awaiters = new List<AwaiterHandle>();

    private readonly object subscribersLock = new object();
    private readonly List<SubscribeHandle> subscribers = new List<SubscribeHandle>();

    private readonly Lazy<ConcurrentQueue<T>> unobservedData = new Lazy<ConcurrentQueue<T>>();

    private volatile int disposeCount;

    /// <summary>
    /// Initializes a new instance of the <see cref="SynchronizedEvent{T}"/> class.
    /// </summary>
    /// <param name="exceptionHandler">Handler for dealing with exceptions during callback invoke.</param>
    /// <param name="storeUnobservedData">Should data be stored when there is no-one listening.</param>
    /// <param name="allowSynchronousContinuations">
    /// Are task-continuations allowed to execute synchronously.
    /// Use with caution as its easy to shoot yourself in the foot with this and cause deadlocks.
    /// For example if a awaiter would block its thread after the await that would also block the
    /// 'Invoke' call of this event.
    /// Why would you ever want to enable this? Well if you know its safe in your usecase then its
    /// faster as it requires less context switching.
    /// </param>
    public SynchronizedEvent(
        IExceptionHandler exceptionHandler,
        bool storeUnobservedData = false,
        bool allowSynchronousContinuations = false)
    {
        this.exceptionHandler = exceptionHandler ?? throw new ArgumentNullException(nameof(exceptionHandler));
        this.storeUnobservedData = storeUnobservedData;
        this.allowSynchronousContinuations = allowSynchronousContinuations;
    }

    /// <summary>
    /// Check if the given task was created by <see cref="SynchronizedEvent{T}"/>.
    /// </summary>
    /// <param name="task">Task to check.</param>
    /// <returns>True if task was created by <see cref="SynchronizedEvent{T}"/> otherwise False.</returns>
    public static bool IsOwnedBy(Task task)
    {
        if (task is null)
            throw new ArgumentNullException(nameof(task));

        return task.AsyncState != null && task.AsyncState is AwaiterHandle;
    }

    /// <summary>
    /// Attempt to cancel a task that is created by <see cref="SynchronizedEvent{T}"/>.
    /// </summary>
    /// <remarks>
    /// Returns false when given a task that was not created from <see cref="SynchronizedEvent{T}"/>.
    /// </remarks>
    /// <param name="task">Task to attempt to cancel.</param>
    /// <returns>True if successfully cancelled, otherwise False.</returns>
    public static bool TryCancel(Task task)
    {
        if (task is null)
            throw new ArgumentNullException(nameof(task));

        var handle = task.AsyncState as AwaiterHandle;
        if (handle == null)
            return false;

        return handle.TryCancel();
    }

    /// <inheritdoc/>
    public Task<T> WaitAsync(CancellationToken cancelToken = default) =>
        this.WaitAsyncInternal(cancelToken);

    /// <inheritdoc/>
    public void Subscribe(
        Action<T> action,
        object subscriptionToken = null,
        bool callOnCapturedContext = true)
    {
        if (action is null)
            throw new ArgumentNullException(nameof(action));

        this.SubscribeInternal(action, state: null, subscriptionToken ?? action, callOnCapturedContext);
    }

    /// <inheritdoc/>
    public void Subscribe(
        Action<T, object> action,
        object state,
        object subscriptionToken = null,
        bool callOnCapturedContext = true)
    {
        if (action is null)
            throw new ArgumentNullException(nameof(action));

        this.SubscribeInternal(action, state, subscriptionToken ?? action, callOnCapturedContext);
    }

    /// <inheritdoc/>
    public bool Unsubscribe(Action<T> action)
    {
        if (action is null)
            throw new ArgumentNullException(nameof(action));

        return this.UnsubscribeInternal(action);
    }

    /// <inheritdoc/>
    public bool Unsubscribe(object subscriptionToken)
    {
        if (subscriptionToken is null)
            throw new ArgumentNullException(nameof(subscriptionToken));

        return this.UnsubscribeInternal(subscriptionToken);
    }

    /// <summary>
    /// Invoke the event.
    /// </summary>
    /// <param name="data">Data of the event.</param>
    public void Invoke(T data) => this.InvokeInternal(data);

    /// <inheritdoc/>
    public void Dispose()
    {
        // Using 'Interlocked.Increment' to make sure we only dispose once even when called concurrently.
        if (Interlocked.Increment(ref this.disposeCount) == 1)
            this.DisposeInternal();
    }

    private Task<T> WaitAsyncInternal(CancellationToken cancelToken = default)
    {
        // If the event has been disposed or cancellation is already requested then early out.
        if (this.disposeCount != 0 || cancelToken.IsCancellationRequested)
            return Task.FromCanceled<T>(cancelToken);

        // If there is any unobserved data then return that.
        if (this.unobservedData.IsValueCreated)
        {
            if (this.unobservedData.Value.TryDequeue(out var data))
                return Task.FromResult(data);
        }

        // Otherwise create a handle to wait for a invoke.
        var handle = new AwaiterHandle(cancelToken, this.allowSynchronousContinuations);

        // Register the handle.
        lock (this.awaitersLock)
        {
            this.awaiters.Add(handle);
        }

        return handle.WaitForInvoke;
    }

    private void SubscribeInternal(
        object action,
        object state,
        object subscriptionToken,
        bool callOnCapturedContext = true)
    {
        Debug.Assert(action is Action<T> || action is Action<T, object>, "Invalid action type");
        Debug.Assert(subscriptionToken != null, "Missing subscription token");

        // If the event has been disposed it can never receive messages anymore.
        if (this.disposeCount != 0)
            throw new ObjectDisposedException(nameof(SynchronizedEvent<T>));

        // Create handle for this subscription.
        var syncContext = callOnCapturedContext ? SynchronizationContext.Current : null;
        var handle = new SubscribeHandle(action, state, subscriptionToken, syncContext, this.exceptionHandler);

        lock (this.subscribersLock)
        {
            this.subscribers.Add(handle);
        }

        // Handle any unobserved data.
        if (this.unobservedData.IsValueCreated)
        {
            var queue = this.unobservedData.Value;
            while (queue.TryDequeue(out var data))
                handle.Invoke(data);
        }
    }

    private bool UnsubscribeInternal(object subscriptionToken)
    {
        // If the event has been disposed then there is no need to unsubscribe anymore.
        if (this.disposeCount != 0)
            return false;

        var removed = false;

        // Remove all subscribers with the same 'subscriptionToken'.
        lock (this.subscribersLock)
        {
            for (int i = this.subscribers.Count - 1; i >= 0; i--)
            {
                if (this.subscribers[i].SubscriptionToken.Equals(subscriptionToken))
                {
                    // Safe to call dispose here while holding the lock because it only sets a bool.
                    this.subscribers[i].Dispose();
                    this.subscribers.RemoveAt(i);
                    removed = true;
                }
            }
        }

        return removed;
    }

    private void InvokeInternal(T data)
    {
        if (this.disposeCount != 0)
            throw new ObjectDisposedException(nameof(SynchronizedEvent<T>));

        // Keep track if someone was invoked with this data.
        var observed = false;

        /* Complete all the awaiters.
        Note: We first make a list of things to invoke before invoking them, reason is
        otherwise there would be a deadlock if a continuation that executes synchronously
        would call 'WaitAsync' on this event again. */

        // 'awaiterInvokeList' is thread-static so this is safe to do.
        if (awaiterInvokeList == null)
            awaiterInvokeList = new List<AwaiterHandle>();
        else
            awaiterInvokeList.Clear();

        // Gather all awaiters to invoke and clear them.
        lock (this.awaitersLock)
        {
            awaiterInvokeList.AddRange(this.awaiters);
            this.awaiters.Clear();
        }

        // Invoke the awaiters.
        foreach (var awaiter in awaiterInvokeList)
            observed |= awaiter.TryComplete(data);

        /* Invoke all the subscribers.
        Note: We first make a list of things to invoke before invoking them, reason is
        otherwise there would be a deadlock if someone called Subscribe / Unsubscribe from
        inside a invoke call. */

        // 'subInvokeList' is thread-static so this is safe to do.
        if (subInvokeList == null)
            subInvokeList = new List<SubscribeHandle>();
        else
            subInvokeList.Clear();

        // Gather all the subscribers to invoke.
        lock (this.subscribersLock)
        {
            subInvokeList.AddRange(this.subscribers);
        }

        // Invoke the subscribers.
        foreach (var sub in subInvokeList)
            sub.Invoke(data);

        observed |= subInvokeList.Count != 0;

        // Store data that was not observed.
        if (!observed && this.storeUnobservedData)
            this.unobservedData.Value.Enqueue(data);
    }

    private void DisposeInternal()
    {
        // Dispose all the awaiters.
        lock (this.awaitersLock)
        {
            /* It is safe to call 'Dispose' here while holding the lock because we've already
            marked the event as disposed so new calls to 'WaitAsync' will be denied so they
            cannot cause a deadlock */
            foreach (var awaiter in this.awaiters)
                awaiter.Dispose();
            this.awaiters.Clear();
        }

        // Dispose all the subscribers.
        lock (this.subscribersLock)
        {
            foreach (var sub in this.subscribers)
                sub.Dispose();
            this.subscribers.Clear();
        }
    }

    private sealed class AwaiterHandle
    {
        private readonly TaskCompletionSource<T> completeSource;
        private readonly CancellationTokenRegistration cancelReg;

        internal AwaiterHandle(CancellationToken cancelToken, bool allowSynchronousContinuations)
        {
            /* Save this handle in the async state of the 'WaitForInvoke' task, that way we
            can cancel a task by fishing the handle out of its state. */
            this.completeSource = new TaskCompletionSource<T>(
                state: this,
                allowSynchronousContinuations ? TaskCreationOptions.None : TaskCreationOptions.RunContinuationsAsynchronously);
            this.cancelReg = cancelToken.Register(this.Dispose, useSynchronizationContext: false);
        }

        internal Task<T> WaitForInvoke
        {
            get
            {
                var result = this.completeSource.Task;
                Debug.Assert(result.AsyncState == this, "Invalid async-state");
                return result;
            }
        }

        internal bool TryComplete(T data)
        {
            this.cancelReg.Dispose();

            return this.completeSource.TrySetResult(data);
        }

        internal bool TryCancel()
        {
            this.cancelReg.Dispose();

            return this.completeSource.TrySetCanceled();
        }

        internal void Dispose() => this.TryCancel();
    }

    private sealed class SubscribeHandle
    {
        private readonly object action;
        private readonly object state;
        private readonly object subscriptionToken;
        private readonly SynchronizationContext context;
        private readonly IExceptionHandler exceptionHandler;

        private volatile bool isDisposed;

        internal SubscribeHandle(
            object action,
            object state,
            object subscriptionToken,
            SynchronizationContext context,
            IExceptionHandler exceptionHandler)
        {
            Debug.Assert(action != null && (action is Action<T> || action is Action<T, object>), "Invalid action");
            Debug.Assert(action != null, "Missing subscription token");
            Debug.Assert(exceptionHandler != null, "Missing exception handler");

            this.action = action;
            this.state = state;
            this.subscriptionToken = subscriptionToken;
            this.context = context;
            this.exceptionHandler = exceptionHandler;
        }

        internal object SubscriptionToken => this.subscriptionToken;

        internal void Invoke(T data)
        {
            // Execute immediately if no context is required or if we are on the right context.
            if (this.context == null || SynchronizationContext.Current == this.context)
            {
                this.InvokeActionInline(data);
            }
            else
            {
                this.context.Post(this.OnPost, data);
            }
        }

        internal void Dispose() => this.isDisposed = true;

        private void OnPost(object input)
        {
            Debug.Assert(input is T, "Posted input is of incorrect type");

            /* Check if the handle has been disposed as otherwise its possible that user action
            is invoked after unsubscribing (if it was already posted to the sync-context) */
            if (this.isDisposed)
                return;

            this.InvokeActionInline((T)input);
        }

        private void InvokeActionInline(T input)
        {
            try
            {
                if (this.action is Action<T> statelessAction)
                    statelessAction.Invoke((T)input);
                else
                    (this.action as Action<T, object>).Invoke((T)input, this.state);
            }
            catch (Exception e)
            {
                this.exceptionHandler.Handle(e);
            }
        }
    }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.