Skip to content

Instantly share code, notes, and snippets.

@odinserj
Last active June 19, 2019 11:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save odinserj/d76ba97f8f2277378bc0e5d1686e8265 to your computer and use it in GitHub Desktop.
Save odinserj/d76ba97f8f2277378bc0e5d1686e8265 to your computer and use it in GitHub Desktop.
// This file is part of Hangfire.
// Copyright © 2017 Sergey Odinokov.
//
// Hangfire is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as
// published by the Free Software Foundation, either version 3
// of the License, or any later version.
//
// Hangfire is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public
// License along with Hangfire. If not, see <http://www.gnu.org/licenses/>.
using System;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Hangfire.Annotations;
using Hangfire.Common;
namespace Hangfire.Processing
{
internal static class TaskExtensions
{
private static readonly Type[] EmptyTypes = new Type[0];
private static readonly WaitHandle InvalidWaitHandleInstance = new InvalidWaitHandle();
public static bool WaitOne(
[NotNull] this WaitHandle waitHandle,
TimeSpan timeout,
CancellationToken token)
{
if (waitHandle == null) throw new ArgumentNullException(nameof(waitHandle));
token.ThrowIfCancellationRequested();
using (var ev = token.GetCancellationEvent())
{
var waitHandles = new[] { waitHandle, ev.WaitHandle };
var waitResult = WaitHandle.WaitAny(waitHandles, timeout);
if (waitResult == 0)
{
return true;
}
token.ThrowIfCancellationRequested();
return false;
}
}
public static Task<bool> WaitOneAsync(
[NotNull] this WaitHandle waitHandle,
TimeSpan timeout,
CancellationToken token)
{
if (waitHandle == null) throw new ArgumentNullException(nameof(waitHandle));
if (timeout < Timeout.InfiniteTimeSpan) throw new ArgumentOutOfRangeException(nameof(timeout));
cancellationToken.ThrowIfCancellationRequested();
if (waitHandle.WaitOne(TimeSpan.Zero))
{
return true;
}
var tcs = CreateCompletionSource<bool>();
var registration = ThreadPool.RegisterWaitForSingleObject(waitHandle, PoolCallback, tcs, timeout, executeOnlyOnce: true);
if (cancellationToken.CanBeCanceled)
{
cancellationToken.Register(
TokenCallback,
Tuple.Create(registration, tcs, cancellationToken),
useSynchronizationContext: false);
}
return await tcs.Task.ConfigureAwait(false);
}
private static void PoolCallback(object state, bool timedOut)
{
// We do call the Unregister method to prevent race condition between
// registered wait and cancellation token registration, so can use the
// SetResult safely.
((TaskCompletionSource<bool>)state).SetResult(!timedOut);
// TODO Dispose TokenCallback registration
}
private static void TokenCallback(object state)
{
// We need to ensure there's no race condition, where wait handle was
// set, but callback wasn't fully completed. In this case handle is
// acquired, but task is cancelled.
var ctx = (Tuple<RegisteredWaitHandle, TaskCompletionSource<bool>, CancellationToken>)state;
ctx.Item1.Unregister(InvalidWaitHandleInstance);
TrySetCanceled(ctx.Item2, ctx.Item3);
}
private static TaskCompletionSource<T> CreateCompletionSource<T>()
{
return new TaskCompletionSource<T>(
#if !NET45
TaskCreationOptions.RunContinuationsAsynchronously
#endif
);
}
private static void TrySetCanceled<T>(TaskCompletionSource<T> source, CancellationToken token)
{
source.TrySetCanceled(
#if !NET45
token
#endif
);
}
private sealed class InvalidWaitHandle : WaitHandle
{
#if !NETSTANDARD1_3
[Obsolete("Use the SafeWaitHandle property instead.")]
public override IntPtr Handle
{
get => InvalidHandle;
set => throw new InvalidOperationException();
}
#endif
}
}
}
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
using TaskExtensions = Hangfire.Processing.TaskExtensions;
// ReSharper disable AssignNullToNotNullAttribute
namespace Hangfire.Core.Tests.Processing
{
public class TaskExtensionsFacts
{
private readonly ManualResetEvent _mre;
private readonly CancellationTokenSource _cts;
public TaskExtensionsFacts()
{
_mre = new ManualResetEvent(false);
_cts = new CancellationTokenSource();
}
[Fact]
public async Task WaitOneAsync_ThrowsArgNullException_WhenWaitHandleIsNull()
{
var exception = await Assert.ThrowsAsync<ArgumentNullException>(
async () => await TaskExtensions.WaitOneAsync(null, TimeSpan.Zero, CancellationToken.None));
Assert.Equal("waitHandle", exception.ParamName);
}
[Fact]
public async Task WaitOneAsync_ThrowsOpCanceledException_WhenCancellationTokenIsCanceled()
{
_cts.Cancel();
var exception = await Assert.ThrowsAsync<OperationCanceledException>(
async () => await TaskExtensions.WaitOneAsync(_mre, TimeSpan.Zero, _cts.Token));
Assert.Equal(_cts.Token, exception.CancellationToken);
}
[Fact]
public async Task WaitOneAsync_ThrowsOpCanceledException_EvenWhenWaitHandleIsSignaled()
{
_cts.Cancel();
_mre.Set();
var exception = await Assert.ThrowsAsync<OperationCanceledException>(
async () => await TaskExtensions.WaitOneAsync(_mre, Timeout.InfiniteTimeSpan, _cts.Token));
Assert.Equal(_cts.Token, exception.CancellationToken);
}
[Fact]
public async Task WaitOneAsync_ReturnsTrue_WhenWaitHandleIsSignaled()
{
_mre.Set();
var result = await TaskExtensions.WaitOneAsync(_mre, Timeout.InfiniteTimeSpan, _cts.Token);
Assert.True(result);
}
[Fact]
public async Task WaitOneAsync_ReturnsTrue_WhenWaitHandleIsSignaled_AndTimeoutIsZero()
{
_mre.Set();
var result = await TaskExtensions.WaitOneAsync(_mre, TimeSpan.Zero, _cts.Token);
Assert.True(result);
}
[Fact]
public async Task WaitOneAsync_ReturnsFalseImmediately_WhenNotSignaled_AndTimeoutIsZero()
{
var result = await TaskExtensions.WaitOneAsync(_mre, TimeSpan.Zero, _cts.Token);
Assert.False(result);
}
[Fact]
public async Task WaitOneAsync_WaitsAndReturnsFalse_WhenNotSignaled_AndNonNullTimeout()
{
var sw = Stopwatch.StartNew();
var result = await TaskExtensions.WaitOneAsync(_mre, TimeSpan.FromMilliseconds(100), _cts.Token);
sw.Stop();
Assert.False(result);
Assert.True(sw.Elapsed > TimeSpan.FromMilliseconds(95), sw.Elapsed.ToString());
}
[Fact]
public async Task WaitOneAsync_WaitsAndThrowsTaskCanceled_WhenNotSignaled_AndCancellationTokenIsCanceled()
{
var sw = Stopwatch.StartNew();
_cts.CancelAfter(TimeSpan.FromMilliseconds(100));
var exception = await Assert.ThrowsAnyAsync<OperationCanceledException>(
async () => await TaskExtensions.WaitOneAsync(_mre, Timeout.InfiniteTimeSpan, _cts.Token));
sw.Stop();
#if !NET452
Assert.Equal(_cts.Token, exception.CancellationToken);
#else
Assert.NotNull(exception);
#endif
Assert.True(sw.Elapsed > TimeSpan.FromMilliseconds(95), sw.Elapsed.ToString());
}
[Fact]
public void WaitOne_ThrowsArgNullException_WhenWaitHandleIsNull()
{
var exception = Assert.Throws<ArgumentNullException>(
() => TaskExtensions.WaitOne(null, TimeSpan.Zero, CancellationToken.None));
Assert.Equal("waitHandle", exception.ParamName);
}
[Fact]
public void WaitOne_ThrowsOpCanceledException_WhenCancellationTokenIsCanceled()
{
_cts.Cancel();
var exception = Assert.Throws<OperationCanceledException>(
() => TaskExtensions.WaitOne(_mre, TimeSpan.Zero, _cts.Token));
Assert.Equal(_cts.Token, exception.CancellationToken);
}
[Fact]
public void WaitOne_ThrowsOpCanceledException_EvenWhenWaitHandleIsSignaled()
{
_cts.Cancel();
_mre.Set();
var exception = Assert.Throws<OperationCanceledException>(
() => TaskExtensions.WaitOne(_mre, Timeout.InfiniteTimeSpan, _cts.Token));
Assert.Equal(_cts.Token, exception.CancellationToken);
}
[Fact]
public void WaitOne_ReturnsTrue_WhenWaitHandleIsSignaled()
{
_mre.Set();
var result = TaskExtensions.WaitOne(_mre, Timeout.InfiniteTimeSpan, _cts.Token);
Assert.True(result);
}
[Fact]
public void WaitOne_ReturnsTrue_WhenWaitHandleIsSignaled_AndTimeoutIsZero()
{
_mre.Set();
var result = TaskExtensions.WaitOne(_mre, TimeSpan.Zero, _cts.Token);
Assert.True(result);
}
[Fact]
public void WaitOne_ReturnsFalseImmediately_WhenNotSignaled_AndTimeoutIsZero()
{
var result = TaskExtensions.WaitOne(_mre, TimeSpan.Zero, _cts.Token);
Assert.False(result);
}
[Fact]
public void WaitOne_WaitsAndReturnsFalse_WhenNotSignaled_AndNonNullTimeout()
{
var sw = Stopwatch.StartNew();
var result = TaskExtensions.WaitOne(_mre, TimeSpan.FromMilliseconds(100), _cts.Token);
sw.Stop();
Assert.False(result);
Assert.True(sw.Elapsed > TimeSpan.FromMilliseconds(95), sw.Elapsed.ToString());
}
[Fact]
public void WaitOne_WaitsAndThrowsTaskCanceled_WhenNotSignaled_AndCancellationTokenIsCanceled()
{
var sw = Stopwatch.StartNew();
_cts.CancelAfter(TimeSpan.FromMilliseconds(100));
var exception = Assert.ThrowsAny<OperationCanceledException>(
() => TaskExtensions.WaitOne(_mre, Timeout.InfiniteTimeSpan, _cts.Token));
sw.Stop();
#if !NET452
Assert.Equal(_cts.Token, exception.CancellationToken);
#else
Assert.NotNull(exception);
#endif
Assert.True(sw.Elapsed > TimeSpan.FromMilliseconds(95), sw.Elapsed.ToString());
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment