Skip to content

Instantly share code, notes, and snippets.

@amoerie
Last active April 30, 2021 08:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save amoerie/65bf3b077a696b1d65cb5fb27cbb7206 to your computer and use it in GitHub Desktop.
Save amoerie/65bf3b077a696b1d65cb5fb27cbb7206 to your computer and use it in GitHub Desktop.
PriorityChannelTest - A proof of concept for a thread safe concurrent queue built on channels
using FluentAssertions;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;
namespace Dobco.PACSONWEB3.LocalImageCache.Tests
{
public class PriorityChannelTest
{
private readonly ITestOutputHelper _testOutputHelper;
public PriorityChannelTest(ITestOutputHelper testOutputHelper)
{
_testOutputHelper = testOutputHelper;
}
[Fact]
public async Task PriorityChannels()
{
// Arrange
var channel1 = Channel.CreateUnbounded<PriorityItem>();
var channel2 = Channel.CreateUnbounded<PriorityItem>();
var channel3 = Channel.CreateUnbounded<PriorityItem>();
var consumedItems = new ConcurrentBag<PriorityItem>();
var numberOfItemsPerProducer = 100;
// Act
var producerTasks = new List<Task>
{
Task.Run(() => Producer("Producer 0")),
Task.Run(() => Producer("Producer 1")),
Task.Run(() => Producer("Producer 2")),
Task.Run(() => Producer("Producer 3")),
Task.Run(() => Producer("Producer 4")),
Task.Run(() => Producer("Producer 5")),
Task.Run(() => Producer("Producer 6")),
Task.Run(() => Producer("Producer 7")),
Task.Run(() => Producer("Producer 8")),
Task.Run(() => Producer("Producer 9")),
};
var consumerTasks = new List<Task>
{
Task.Run(() => Consumer("Consumer 0")),
Task.Run(() => Consumer("Consumer 1")),
Task.Run(() => Consumer("Consumer 2"))
};
await Task.WhenAll(producerTasks).ConfigureAwait(false);
channel1.Writer.Complete();
channel2.Writer.Complete();
channel3.Writer.Complete();
// Act
await Task.WhenAll(consumerTasks).ConfigureAwait(false);
// Assert
var consumedItemsList = consumedItems.ToList();
consumedItemsList.Count.Should().Be(producerTasks.Count * numberOfItemsPerProducer);
var averageWaitTimePerPriority = consumedItemsList
.GroupBy(i => i.Priority)
.Select(group => new
{
Priority = group.Key,
AverageWaitTimeInMs = group
.Select(item => (item.ConsumptionDateTime!.Value - item.ProductionDateTime).TotalMilliseconds)
.DefaultIfEmpty(0)
.Average()
})
.ToDictionary(i => i.Priority, i => i.AverageWaitTimeInMs);
var channel1AverageWaitTime = averageWaitTimePerPriority[1];
var channel2AverageWaitTime = averageWaitTimePerPriority[2];
var channel3AverageWaitTime = averageWaitTimePerPriority[3];
Log($"Channel 1 average wait time: {channel1AverageWaitTime}ms");
Log($"Channel 2 average wait time: {channel2AverageWaitTime}ms");
Log($"Channel 3 average wait time: {channel3AverageWaitTime}ms");
channel1AverageWaitTime.Should().BeLessThan(channel2AverageWaitTime);
channel2AverageWaitTime.Should().BeLessThan(channel3AverageWaitTime);
// See logging
// A producer pumps items into random channels
async Task Producer(string producerName)
{
var channels = new[] {channel1.Writer, channel2.Writer, channel3.Writer };
var random = new Random();
for (int i = 0; i < numberOfItemsPerProducer; i++)
{
// 1-10ms random delay
await Task.Delay(TimeSpan.FromMilliseconds(random.Next(1, 10))).ConfigureAwait(false);
var priority = random.Next(1, channels.Length + 1);
// High priority items go in channel 1, low priority items in channel 3
var channel = channels[priority - 1];
var priorityItem = new PriorityItem
{
Label = $"Item {i}",
ProducerName = producerName,
ProductionDateTime = DateTime.Now,
Priority = priority,
ConsumerName = null,
ConsumptionDateTime = null
};
if (!channel.TryWrite(priorityItem))
throw new InvalidOperationException($"{producerName} could not write to channel {priority}");
}
}
// A consumer reads from all channels, trying to read from the highest priority channels first
async Task Consumer(string consumerName)
{
var channels = new List<ChannelReader<PriorityItem>>() {channel1.Reader, channel2.Reader, channel3.Reader};
// When a channel is empty, we remove it from the list
while (channels.Count > 0)
{
bool didRead = false;
PriorityItem item = null;
// Try to synchronously read from the channels in order of priority
foreach (var channel in channels)
{
if (channel.TryRead(out item))
{
didRead = true;
break;
}
}
if (didRead)
{
await OnConsume(consumerName, item).ConfigureAwait(false);
continue;
}
// Try to asynchronously read from the channels in order of priority
var waits = new Task<bool>[channels.Count];
for (int i = 0; i < channels.Count; i++)
{
waits[i] = channels[i].WaitToReadAsync(CancellationToken.None).AsTask();
}
// Wait until one of the "WaitToReadAsync" calls completes
var winner = await Task.WhenAny(waits).ConfigureAwait(false);
// Find which channel won the race
var index = Array.IndexOf(waits, winner);
var winningChannel = channels[index];
// WaitToReadAsync returns false if the channel is complete, i.e. there will never be any more items
var winningChannelCanRead = await winner.ConfigureAwait(false);
// This channel is empty, remove it from the list
if (!winningChannelCanRead)
{
channels.Remove(winningChannel);
continue;
}
// Try to read from the winning channel, we might be too late but that's okay
if (winningChannel.TryRead(out item))
{
await OnConsume(consumerName, item).ConfigureAwait(false);
}
}
}
async Task OnConsume(string consumerName, PriorityItem item)
{
if (item == null)
throw new ArgumentException("Item is null");
item.ConsumerName = consumerName;
item.ConsumptionDateTime = DateTime.Now;
Log($"{consumerName} consumed item {item.Label} with priority {item.Priority}");
consumedItems.Add(item);
// 1-10ms random delay
var random = new Random();
await Task.Delay(TimeSpan.FromMilliseconds(random.Next(1, 10))).ConfigureAwait(false);
}
void Log(string message)
{
_testOutputHelper.WriteLine($"{DateTime.Now.ToString("HH:mm:ss.fff", CultureInfo.InvariantCulture)} {message}");
}
}
}
public class PriorityItem
{
public int Priority { get; set; }
public string Label { get; set; }
public string ProducerName { get; set; }
public string ConsumerName { get; set; }
public DateTime ProductionDateTime { get; set; }
public DateTime? ConsumptionDateTime { get; set; }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment