Skip to content

Instantly share code, notes, and snippets.

@mzahor
Last active May 23, 2017 02:01
Show Gist options
  • Save mzahor/2819bbd25499fc169bcd to your computer and use it in GitHub Desktop.
Save mzahor/2819bbd25499fc169bcd to your computer and use it in GitHub Desktop.
Throttling task queue
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;
namespace YammerCrawlSync
{
public class ThrottlingQueue : IDisposable
{
private SemaphoreSlim semaphore;
public ThrottlingQueue(int max)
{
this.semaphore = new SemaphoreSlim(max);
}
public void Dispose()
{
this.semaphore.Dispose();
}
public async Task<T> Execute<T>(Func<Task<T>> func)
{
await this.semaphore.WaitAsync();
try
{
return await func();
}
finally
{
this.semaphore.Release();
}
}
}
[TestFixture]
public class ThrottlingQueueTests
{
[Test]
public async Task ItShouldReturnResult()
{
var queue = new ThrottlingQueue(10);
var result = await queue.Execute(
async () =>
{
await Task.Delay(1000);
return 10;
});
queue.Dispose();
Assert.AreEqual(10, result);
}
[Test]
public async Task ItShouldThrowExceptionFromTask()
{
var queue = new ThrottlingQueue(10);
Assert.Throws<Exception>(
async () =>
{
var result = await queue.Execute(
async () =>
{
await Task.Delay(1000);
throw new Exception();
return 10;
});
});
queue.Dispose();
}
[Test]
public async Task ItShouldThrottleTasks()
{
var counter = 0;
var max = 10;
var queue = new ThrottlingQueue(max);
var tasks = new Stack<Task>();
for (int i = 0; i < 1000; i++)
{
tasks.Push(
queue.Execute(
async () =>
{
Interlocked.Increment(ref counter);
if (counter > max)
{
Assert.Fail("Max exceeded {0}", counter);
}
await Task.Delay(10);
Interlocked.Decrement(ref counter);
return 10;
}));
}
await Task.WhenAll(tasks);
queue.Dispose();
Assert.Pass();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment