Last active
May 23, 2017 02:01
-
-
Save mzahor/2819bbd25499fc169bcd to your computer and use it in GitHub Desktop.
Throttling task queue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using NUnit.Framework; | |
namespace YammerCrawlSync | |
{ | |
public class ThrottlingQueue : IDisposable | |
{ | |
private SemaphoreSlim semaphore; | |
public ThrottlingQueue(int max) | |
{ | |
this.semaphore = new SemaphoreSlim(max); | |
} | |
public void Dispose() | |
{ | |
this.semaphore.Dispose(); | |
} | |
public async Task<T> Execute<T>(Func<Task<T>> func) | |
{ | |
await this.semaphore.WaitAsync(); | |
try | |
{ | |
return await func(); | |
} | |
finally | |
{ | |
this.semaphore.Release(); | |
} | |
} | |
} | |
[TestFixture] | |
public class ThrottlingQueueTests | |
{ | |
[Test] | |
public async Task ItShouldReturnResult() | |
{ | |
var queue = new ThrottlingQueue(10); | |
var result = await queue.Execute( | |
async () => | |
{ | |
await Task.Delay(1000); | |
return 10; | |
}); | |
queue.Dispose(); | |
Assert.AreEqual(10, result); | |
} | |
[Test] | |
public async Task ItShouldThrowExceptionFromTask() | |
{ | |
var queue = new ThrottlingQueue(10); | |
Assert.Throws<Exception>( | |
async () => | |
{ | |
var result = await queue.Execute( | |
async () => | |
{ | |
await Task.Delay(1000); | |
throw new Exception(); | |
return 10; | |
}); | |
}); | |
queue.Dispose(); | |
} | |
[Test] | |
public async Task ItShouldThrottleTasks() | |
{ | |
var counter = 0; | |
var max = 10; | |
var queue = new ThrottlingQueue(max); | |
var tasks = new Stack<Task>(); | |
for (int i = 0; i < 1000; i++) | |
{ | |
tasks.Push( | |
queue.Execute( | |
async () => | |
{ | |
Interlocked.Increment(ref counter); | |
if (counter > max) | |
{ | |
Assert.Fail("Max exceeded {0}", counter); | |
} | |
await Task.Delay(10); | |
Interlocked.Decrement(ref counter); | |
return 10; | |
})); | |
} | |
await Task.WhenAll(tasks); | |
queue.Dispose(); | |
Assert.Pass(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment