Skip to content

Instantly share code, notes, and snippets.

@JKamsker
Last active December 8, 2023 18:19
Show Gist options
  • Save JKamsker/0de2d65ba0730d89022e7aea83ca7643 to your computer and use it in GitHub Desktop.
Save JKamsker/0de2d65ba0730d89022e7aea83ca7643 to your computer and use it in GitHub Desktop.
FileWriterMutex
using System.Collections.Concurrent;
namespace JKToolKit.Logging.File.Utils;
public abstract class AsyncMutex : IDisposable
{
private static ConcurrentDictionary<string, AsyncMutex> _localMutexes = new ConcurrentDictionary<string, AsyncMutex>();
public abstract Task<IDisposable> WaitOneAsync(CancellationToken ct);
public abstract void Release();
public abstract void Dispose();
public static AsyncMutex Create(string name, Scope scope)
{
return scope switch
{
Scope.Global => CreateGlobal(name),
Scope.Local => CreateLocal(name),
Scope.None => NoMutex.Instance,
_ => throw new ArgumentOutOfRangeException(nameof(scope), scope, null)
};
}
private static AsyncMutex CreateLocal(string name)
{
return _localMutexes.GetOrAdd(name, _ => new LocalScope());
}
private static AsyncMutex CreateGlobal(string name)
{
return new GlobalScope(name);
}
public enum Scope
{
Global,
Local,
None
}
private class GlobalScope : AsyncMutex
{
private readonly Semaphore _mutex;
public GlobalScope(string name)
{
_mutex = new Semaphore(1, 1, name);
}
public override async Task<IDisposable> WaitOneAsync(CancellationToken ct)
{
var thread = new Thread(() => _mutex.WaitOne());
try
{
return await Task.Factory.StartNew
(
() =>
{
_mutex.WaitOne();
return (IDisposable)new AsyncMutexDisposable(this);
},
ct,
TaskCreationOptions.LongRunning,
TaskScheduler.Default
);
}
catch (TaskCanceledException)
{
return new AsyncMutexDisposable(this, false);
}
}
public override void Release()
{
_mutex.Release();
}
public override void Dispose()
{
_mutex.Dispose();
}
}
private class LocalScope : AsyncMutex
{
private readonly SemaphoreSlim _semaphore;
public LocalScope(int initialCount = 1, int maxCount = 1)
{
_semaphore = new SemaphoreSlim(initialCount, maxCount);
}
public override async Task<IDisposable> WaitOneAsync(CancellationToken ct)
{
try
{
await _semaphore.WaitAsync(ct);
return new AsyncMutexDisposable(this);
}
catch (TaskCanceledException)
{
return new AsyncMutexDisposable(this, false);
}
}
public override void Release()
{
_semaphore.Release();
}
public override void Dispose()
{
_semaphore.Dispose();
}
}
public class NoMutex : AsyncMutex
{
public static readonly NoMutex Instance = new NoMutex();
public override Task<IDisposable> WaitOneAsync(CancellationToken ct)
{
return Task.FromResult<IDisposable>(new AsyncMutexDisposable(this));
}
public override void Release()
{
}
public override void Dispose()
{
}
}
private class AsyncMutexDisposable : IDisposable
{
private readonly AsyncMutex _mutex;
private readonly bool _shouldRelease;
public AsyncMutexDisposable(AsyncMutex mutex, bool shouldRelease = true)
{
_mutex = mutex;
_shouldRelease = shouldRelease;
}
public void Dispose()
{
if (_shouldRelease)
{
_mutex.Release();
}
}
}
}
public class ParallelFileWriter : IAsyncDisposable
{
private readonly Channel<IMessage> _channel;
private readonly AsyncMutex _mutex;
private readonly string _filePath;
private readonly FileWriterSettings _settings;
private Task _workerTask;
public ParallelFileWriter(string filePath, FileWriterSettings? settings)
{
settings ??= new FileWriterSettings
{
FullQueueStrategy = FullQueueStrategy.Block,
QueueSize = null
};
_channel = ChannelFactory.CreateChannel<IMessage>(settings);
_filePath = filePath;
_settings = settings;
var mutexName = HashUtils.Md5String(filePath);
_mutex = AsyncMutex.Create(mutexName + "1", settings.MutexScope);
}
public void Write(string message)
{
Write(new Message { Content = message });
}
public void Write(IMessage message)
{
// In case of Block strategy, this will return false if the channel is full.
// In case of DropNewest or DropOldest, this will drop the appropriate message and write the new one.
var waittime = 1;
while (!_channel.Writer.TryWrite(message))
{
// If TryWrite returned false and the FullQueueStrategy is Block, we could add a small delay here before trying again.
// However, this would make the method no longer fully synchronous.
// If a delay is not acceptable, you could throw an exception or handle this case differently.
if (_settings.FullQueueStrategy == FullQueueStrategy.Block)
{
//Console.WriteLine("Channel is full, waiting" + waittime + "ms");
Thread.Sleep(waittime);
// Expotential backoff, max 500ms
waittime = Math.Min(waittime * 2, 500);
}
else
{
throw new Exception("Channel is full");
}
}
}
public async Task WriteAsync(IMessage message)
{
await _channel.Writer.WriteAsync(message);
}
public void Start(CancellationToken cancellationToken = default)
{
_workerTask = Task.Run(
async () =>
{
try
{
await Worker(cancellationToken);
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
},
cancellationToken
);
}
public async Task StopAsync()
{
_channel.Writer.Complete();
await _workerTask;
}
private async Task Worker(CancellationToken cancellationToken)
{
var isFirstTime = true;
while (
await _channel.Reader.WaitToReadAsync(cancellationToken)
&& !cancellationToken.IsCancellationRequested
)
{
if (!isFirstTime)
{
// Wait for one millsecond, to allow other threads/processes to write to the file.
// Instant relocking could cause the lock not actually being released fairly.
await Task.Delay(1, cancellationToken);
}
isFirstTime = false;
using var _ = await _mutex.WaitOneAsync(cancellationToken);
if (cancellationToken.IsCancellationRequested)
{
break;
}
using FileStream fs = await OpenFileAsync();
using var writer = new StreamWriter(fs, Encoding.UTF8, _settings.CacheSize);
var count = 0;
//var sw = Stopwatch.StartNew();
while (count < 100_000 && _channel.Reader.TryRead(out var message))
{
await message.WriteToAsync(writer);
count++;
}
}
}
private async ValueTask<FileStream> OpenFileAsync(int maxAttempts = 20, TimeSpan? initialDelay = null, TimeSpan? maxDelay = null)
{
TimeSpan delay = initialDelay ?? TimeSpan.FromMilliseconds(10);
TimeSpan maximumDelay = maxDelay ?? TimeSpan.FromMinutes(0.5);
foreach (var backoffDelay in Backoff.Logarithmic(maxAttempts, delay, maximumDelay))
{
try
{
//Console.WriteLine($"Writing to file {_filePath}");
return File.Open(
_filePath,
new FileStreamOptions
{
Access = FileAccess.Write,
Mode = FileMode.Append,
Share = FileShare.Read, // maybe ReadWrite?
Options = FileOptions.Asynchronous | FileOptions.WriteThrough,
BufferSize = _settings.CacheSize
}
);
}
catch (IOException e)
{
// The file is locked, log the error and wait
Debug.WriteLine($"Attempt failed: {e.Message}");
// Wait before trying again
await Task.Delay(backoffDelay);
}
}
// This should never be reached, but is required for the function to compile
throw new InvalidOperationException("Failed to open file after multiple attempts.");
}
public async ValueTask DisposeAsync()
{
await StopAsync();
}
}
internal class ChannelFactory
{
public static Channel<T> CreateChannel<T>(IChannelSettings settings)
{
if (!settings.QueueSize.HasValue)
{
return Channel.CreateUnbounded<T>(
new UnboundedChannelOptions { SingleReader = true, SingleWriter = false }
);
}
return Channel.CreateBounded<T>(
new BoundedChannelOptions(settings.QueueSize.Value)
{
FullMode = settings.FullQueueStrategy switch
{
FullQueueStrategy.Block => BoundedChannelFullMode.Wait,
FullQueueStrategy.DropNewest => BoundedChannelFullMode.DropWrite,
FullQueueStrategy.DropOldest => BoundedChannelFullMode.DropOldest,
_ => throw new ArgumentOutOfRangeException(nameof(settings.FullQueueStrategy))
}
}
);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment