Skip to content

Instantly share code, notes, and snippets.

@cheenamalhotra
Last active November 18, 2020 23:37
Show Gist options
  • Save cheenamalhotra/d6d4140b35536064db14d744d4d700c1 to your computer and use it in GitHub Desktop.
Save cheenamalhotra/d6d4140b35536064db14d744d4d700c1 to your computer and use it in GitHub Desktop.
Ordered Queue for Async tasks using SemaphoreSlim
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
public static class TestOrder
{
internal class ConcurrentQueueSemaphore
{
private readonly SemaphoreSlim _semaphore;
private readonly ConcurrentQueue<TaskCompletionSource<bool>> _queue =
new ConcurrentQueue<TaskCompletionSource<bool>>();
public ConcurrentQueueSemaphore(int initialCount)
{
_semaphore = new SemaphoreSlim(initialCount);
}
public ConcurrentQueueSemaphore(int initialCount, int maxCount)
{
_semaphore = new SemaphoreSlim(initialCount, maxCount);
}
public void Wait(int n)
{
WaitAsync(n).Wait();
}
public Task WaitAsync(int n)
{
var tcs = new TaskCompletionSource<bool>();
_queue.Enqueue(tcs);
Console.WriteLine(n + " is added");
_semaphore.WaitAsync().ContinueWith(t =>
{
Console.WriteLine(n + " is executed. Order of execution matters.");
if (_queue.TryDequeue(out TaskCompletionSource<bool> popped))
popped.SetResult(true);
});
return tcs.Task;
}
public int Release(int n)
{
Console.WriteLine(n + " releases");
_semaphore.Release();
return _semaphore.CurrentCount;
}
public int CurrentCount
{
get
{
return _semaphore.CurrentCount;
}
}
}
public static void Main()
{
int padding = 0;
ConcurrentQueueSemaphore semaphore = new ConcurrentQueueSemaphore(1);
Console.WriteLine("{0} tasks can enter the semaphore.\n", semaphore.CurrentCount);
Task[] tasks = new Task[5];
// Create and start five numbered tasks.
for (int i = 0; i <= 4; i++)
{
tasks[i] = Task.Run(() =>
{
int semaphoreCount;
semaphore.Wait((int)Task.CurrentId);
try
{
// Just imagining we're doing some time consuming stuff.
Thread.Sleep(500);
Interlocked.Add(ref padding, 100);
}
finally
{
semaphoreCount = semaphore.Release((int)Task.CurrentId);
}
});
}
// Wait for half a second, to allow all the tasks to start and block.
Thread.Sleep(500);
// Main thread waits for the tasks to complete.
Task.WaitAll(tasks);
Console.WriteLine("\nMain thread exits.");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment