Skip to content

Instantly share code, notes, and snippets.

@michel-pi
Created October 15, 2023 13:57
Show Gist options
  • Save michel-pi/f766cc3ac6efb066ab69d807744d2db8 to your computer and use it in GitHub Desktop.
Save michel-pi/f766cc3ac6efb066ab69d807744d2db8 to your computer and use it in GitHub Desktop.
Provides services for managing the queue of work items for a thread.
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Snippets.Threading
{
/// <summary>
/// Provides services for managing the queue of work items for a thread.
/// </summary>
public sealed class WorkerThreadDispatcher
{
private readonly Queue<WorkQueueItem> _queue;
/// <summary>
/// Initializes a new instance of the <see cref="WorkerThreadDispatcher"/> class.
/// </summary>
public WorkerThreadDispatcher()
{
_queue = new Queue<WorkQueueItem>();
}
/// <summary>
/// Executes the specified <see cref="Action"/> synchronously on the thread the <see cref="WorkerThreadDispatcher"/> is associated with.
/// </summary>
/// <param name="callback">A delegate to invoke through the dispatcher.</param>
/// <remarks>
/// Blocks the current thread until the <paramref name="callback" /> is executed.
/// </remarks>
public void Invoke(Action callback)
{
_ = Invoke(() =>
{
callback();
return (object?)null;
});
}
/// <summary>
/// Executes the specified <see cref="Func{TResult}"/> synchronously on the thread the <see cref="WorkerThreadDispatcher"/> is associated with.
/// </summary>
/// <typeparam name="T">The return value type of the specified delegate.</typeparam>
/// <param name="callback">A delegate to invoke through the dispatcher.</param>
/// <returns>The value returned by <paramref name="callback"/>.</returns>
/// <remarks>
/// Blocks the current thread until the <paramref name="callback" /> is executed.
/// </remarks>
public T? Invoke<T>(Func<T?> callback)
{
WorkQueueItem item;
lock (this)
{
item = EnqueueWorkItem(() => callback(), true);
}
object? a = item.WaitExecuted();
return (T?)a;
}
/// <summary>
/// Asynchronously executes the specified <see cref="Action"/> synchronously on the thread the <see cref="WorkerThreadDispatcher"/> is associated with.
/// </summary>
/// <param name="callback">A delegate to invoke through the dispatcher.</param>
/// <returns>A task that represents the asynchronous operation.</returns>
public Task InvokeAsync(Action callback)
{
return InvokeAsync(() =>
{
callback();
return (object?)null;
});
}
/// <summary>
/// Asynchronously executes the specified <see cref="Func{TResult}"/> synchronously on the thread the <see cref="WorkerThreadDispatcher"/> is associated with.
/// </summary>
/// <typeparam name="T">The return value type of the specified delegate.</typeparam>
/// <param name="callback">A delegate to invoke through the dispatcher.</param>
/// <returns>A task that represents the asynchronous operation which wraps the result.</returns>
public Task<T?> InvokeAsync<T>(Func<T?> callback)
{
WorkQueueItem item;
lock (this)
{
item = EnqueueWorkItem(() => callback(), true);
}
return Task.Factory.StartNew(() => (T?)item.WaitExecuted());
}
/// <summary>
/// Executes the specified <see cref="Action"/> on the thread the <see cref="WorkerThreadDispatcher"/> is associated with.
/// </summary>
/// <param name="callback">A delegate to invoke through the dispatcher.</param>
/// <remarks>
/// Does not wait until the <paramref name="callback"/> is executed.
/// </remarks>
public void InvokeWithoutWait(Action callback)
{
lock (this)
{
_ = EnqueueWorkItem(() =>
{
callback();
return null;
}, false);
}
}
/// <summary>
/// Process a single work item.
/// </summary>
/// <returns>Returns <see langword="true"/> when more work items can be processed; otherwise <see langword="false"/>.</returns>
/// <remarks>
/// Use <see langword="lock"/> on this instance for thread safety and convenience.
/// </remarks>
public bool ProcessWork()
{
if (_queue.Count == 0)
{
return false;
}
var item = _queue.Dequeue();
if (item.Execute())
{
// dispose the item when no one is waiting for the result. fire and forget.
item.Dispose();
}
return _queue.Count != 0;
}
private WorkQueueItem EnqueueWorkItem(Func<object?> callback, bool deferredDispose)
{
var item = new WorkQueueItem(callback, deferredDispose);
_queue.Enqueue(item);
return item;
}
internal class WorkQueueItem : IDisposable
{
private readonly AutoResetEvent _autoResetEvent;
private readonly bool _deferredDispose;
private readonly Func<object?> _callback;
private object? _callbackResult;
public WorkQueueItem(Func<object?> callback, bool deferredDispose)
{
ArgumentNullException.ThrowIfNull(callback, nameof(callback));
_callback = callback;
_autoResetEvent = new AutoResetEvent(false);
_deferredDispose = deferredDispose;
}
// returns true when the caller should dispose this item.
public bool Execute()
{
var callbackResult = _callback.Invoke();
Volatile.Write(ref _callbackResult, callbackResult);
_autoResetEvent.Set();
return !_deferredDispose;
}
public object? WaitExecuted()
{
_autoResetEvent.WaitOne();
var result = Volatile.Read(ref _callbackResult);
if (_deferredDispose)
{
Dispose();
}
return result;
}
public void Dispose()
{
_autoResetEvent.Set();
_autoResetEvent.Dispose();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment