Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
An awaitable for waiting a part of a loop.
using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
namespace Walterlv
{
/// <summary>
/// 为一个持续操作中的一部分提供可异步等待的操作。
/// </summary>
public class ContinuousPartOperation
{
private readonly TaskCompletionSource<object> _source;
private readonly Awaiter _awaiter;
private Action _continuation;
private Exception _exception;
internal ContinuousPartOperation()
{
_source = new TaskCompletionSource<object>();
_awaiter = new Awaiter(this);
}
/// <summary>
/// 获取一个值,该值指示此异步操作是否已经结束。
/// </summary>
internal bool IsCompleted { get; private set; }
/// <summary>
/// 完成此异步操作。
/// </summary>
internal void Complete()
{
IsCompleted = true;
if (_exception == null)
{
_source.SetResult(null);
}
else
{
_source.SetException(_exception);
}
var continuation = _continuation;
_continuation = null;
continuation?.Invoke();
}
/// <summary>
/// 使用一个新的 <paramref name="exception"/> 来设置此异步操作完成对象。
/// </summary>
/// <param name="exception">一个异常,当设置后,同步或异步等待此对象时将抛出异常。</param>
internal void UpdateException(Exception exception) => _exception = exception;
/// <summary>
/// 获取一个用于等待此异步操作的可等待对象。
/// </summary>
public Awaiter GetAwaiter() => _awaiter;
/// <summary>
/// 同步等待此异步操作完成。
/// </summary>
public void Wait() => _source.Task.GetAwaiter().GetResult();
/// <summary>
/// 表示用于等待 <see cref="ContinuousPartOperation"/> 的异步可等待对像。
/// </summary>
public sealed class Awaiter : INotifyCompletion
{
private readonly ContinuousPartOperation _owner;
/// <summary>
/// 创建一个用于等待 <see cref="ContinuousPartOperation"/> 的异步可等待对象。
/// </summary>
internal Awaiter(ContinuousPartOperation owner)
{
_owner = owner;
}
/// <summary>Schedules the continuation action that's invoked when the instance completes.</summary>
/// <param name="continuation">The action to invoke when the operation completes.</param>
/// <exception cref="T:System.ArgumentNullException">The <paramref name="continuation">continuation</paramref> argument is null (Nothing in Visual Basic).</exception>
public void OnCompleted(Action continuation)
{
if (IsCompleted)
{
continuation?.Invoke();
}
else
{
_owner._continuation += continuation;
}
}
/// <summary>
/// 获取一个值,该值指示异步操作是否完成。
/// </summary>
public bool IsCompleted => _owner.IsCompleted;
/// <summary>
/// 获取此异步操作的结果。
/// </summary>
[DebuggerStepThrough]
public void GetResult() => _owner._source.Task.GetAwaiter().GetResult();
}
}
}
using System;
namespace Walterlv
{
/// <summary>
/// 为次数限制的异步等待操作提供操作。
/// </summary>
public class CountLimitOperationToken
{
private readonly long _countLimit;
private long _passed;
/// <summary>
/// 创建一个具有指定执行次数限制的 <see cref="CountLimitOperationToken"/>。
/// </summary>
/// <param name="countLimit">次数限制,可能是不精确的。</param>
public CountLimitOperationToken(long countLimit)
{
Operation = new ContinuousPartOperation();
_countLimit = countLimit < 0 ? long.MaxValue : countLimit;
}
/// <summary>
/// 获取一个可 await 等待的等待对象。
/// </summary>
public ContinuousPartOperation Operation { get; }
/// <summary>
/// 完成此异步操作。
/// </summary>
/// <param name="removeIntermediateExceptions">
/// 默认情况下,如果此前发生过异常,则认为那是重试过程中的中间异常,现在成功完成了任务,所以中间异常需要移除。
/// 不过,你也可以选择不移除,意味着此任务的完成属于强制终止,而不是成功完成。
/// </param>
public void Complete(bool removeIntermediateExceptions = true)
{
if (removeIntermediateExceptions)
{
Operation.UpdateException(null);
}
if (!Operation.IsCompleted)
{
Operation.Complete();
}
}
/// <summary>
/// 通知此 <see cref="ContinuousPartOperation"/> 自上次调用 <see cref="Pass"/> 方法以来增加的次数。
/// </summary>
/// <param name="countPassed">自上次调用 <see cref="Pass"/> 方法以来增加的次数。</param>
public void Pass(long countPassed)
{
_passed += countPassed;
if (_passed >= _countLimit && !Operation.IsCompleted)
{
Operation.Complete();
}
}
/// <summary>
/// 使用一个新的 <paramref name="exception"/> 来设置此异步操作完成对象。
/// </summary>
/// <param name="exception">一个异常,当设置后,同步或异步等待此对象时将抛出异常。</param>
public void UpdateException(Exception exception)
{
if (!Operation.IsCompleted)
{
Operation.UpdateException(exception);
}
}
}
}
using System;
namespace Walterlv
{
/// <summary>
/// 为一个操作包装结果信息,包括成功与否、异常和取消信息。
/// </summary>
public readonly struct OperationResult
{
/// <summary>
/// 使用指定的异常创建 <see cref="OperationResult"/> 的新实例。
/// 这个操作结果是失败的。
/// </summary>
/// <param name="exception">操作过程中收集到的异常。</param>
public OperationResult(Exception exception)
{
Exception = exception ?? throw new ArgumentNullException(nameof(exception));
IsCanceled = false;
}
/// <summary>
/// 创建一个成功的或者取消的 <see cref="OperationResult"/>。
/// </summary>
/// <param name="isSuccessOrCanceled">
/// 如果为 true,则创建一个成功的操作结果;如果为 false,创建一个取消的操作结果。
/// </param>
public OperationResult(bool isSuccessOrCanceled)
{
Exception = null;
IsCanceled = !isSuccessOrCanceled;
}
/// <summary>
/// 获取一个值,该值指示操作已经成功完成。
/// </summary>
public bool Success => Exception == null && IsCanceled is false;
/// <summary>
/// 获取操作过程中发生或收集的异常。
/// </summary>
public Exception Exception { get; }
/// <summary>
/// 获取此操作是否已被取消。
/// </summary>
public bool IsCanceled { get; }
/// <summary>
/// 将操作结果视为成功与否的 bool 值。
/// </summary>
public static implicit operator bool(OperationResult result) => result.Success;
/// <summary>
/// 将操作结果视为异常。
/// </summary>
public static implicit operator Exception(OperationResult result) => result.Exception;
/// <summary>
/// 将异常作为操作结果使用。
/// </summary>
public static implicit operator OperationResult(Exception exception)
=> new OperationResult(exception ?? throw new ArgumentNullException(nameof(exception),
$"只有非空的异常才可以看转换为 {nameof(OperationResult)}。"));
/// <summary>
/// 将成功或取消信息作为操作结果使用。
/// </summary>
public static implicit operator OperationResult(bool isSuccessOrCanceled)
=> new OperationResult(isSuccessOrCanceled);
/// <summary>
/// 判断操作是否是成功的。
/// </summary>
public static bool operator true(OperationResult result) => result.Success;
/// <summary>
/// 判断操作是否是失败的。
/// </summary>
public static bool operator false(OperationResult result) => !result.Success;
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Walterlv
{
/// <summary>
/// 对于异步的,出错后会重试的操作,使用此类型可以辅助等待循环重试的一部分。
/// </summary>
public class PartialAwaitableRetry
{
private readonly object _locker = new object();
private readonly Func<PartialRetryContext, Task<OperationResult>> _loopItem;
private readonly List<CountLimitOperationToken> _tokens = new List<CountLimitOperationToken>();
private volatile bool _isLooping;
/// <summary>
/// 使用一个循环任务初始化 <see cref="PartialAwaitableRetry"/> 的一个新实例。
/// </summary>
/// <param name="loopItem">一个循环任务。</param>
public PartialAwaitableRetry(Func<PartialRetryContext, Task<OperationResult>> loopItem)
{
_loopItem = loopItem ?? throw new ArgumentNullException(nameof(loopItem));
}
/// <summary>
/// 以指定的次数限制加入循环,并返回等待此循环结果的可等待对象。
/// 此方法是线程安全的。
/// </summary>
/// <param name="countLimit">次数限制,当设置为 -1 时表示无限次循环。</param>
/// <returns>等待循环结果的可等待对象。</returns>
public ContinuousPartOperation JoinAsync(int countLimit)
{
var token = new CountLimitOperationToken(countLimit);
lock (_locker)
{
_tokens.Add(token);
if (!_isLooping)
{
Loop();
}
}
return token.Operation;
}
/// <summary>
/// 执行实际的循环,并在每一次执行的时候会给所有的等待对象报告结果。
/// </summary>
private async void Loop()
{
_isLooping = true;
var context = new PartialRetryContext();
var shouldContinue = true;
try
{
while (shouldContinue)
{
Exception exception;
bool isCompleted;
// 加锁获取此时此刻的 Token 集合副本。
// 执行一次循环的时候,只能操作此集合副本,真实集合新增的元素由于没有参与循环操作的执行;
// 这意味着期望执行一次方法的时候却没有执行,所以不能提供结果。
List<CountLimitOperationToken> snapshot;
lock (_locker)
{
snapshot = _tokens.ToList();
}
try
{
var result = await _loopItem.Invoke(context).ConfigureAwait(false);
exception = result.Exception;
isCompleted = result.Success;
}
catch (Exception ex)
{
exception = ex;
isCompleted = false;
}
if (exception != null)
{
foreach (var token in snapshot)
{
token.UpdateException(exception);
}
}
if (isCompleted)
{
foreach (var token in snapshot)
{
token.Complete();
}
lock (_locker)
{
_tokens.RemoveAll(token => snapshot.Contains(token));
shouldContinue = _tokens.Count > 0;
}
}
else
{
foreach (var token in snapshot)
{
token.Pass(context.StepCount);
}
}
}
}
finally
{
_isLooping = false;
}
}
}
/// <summary>
/// 为 <see cref="PartialAwaitableRetry"/> 提供循环执行的上下文设置信息。
/// </summary>
public sealed class PartialRetryContext
{
private int _stepCount = 1;
/// <summary>
/// 获取或设置此方法一次执行时经过了多少次循环。
/// 当某个方法执行时需要进行不打断的多次循环才能完成时,可以修改此值。
/// </summary>
public int StepCount
{
get => _stepCount;
set
{
if (_stepCount <= 0)
{
throw new ArgumentException("次数必须大于或等于 1。", nameof(value));
}
_stepCount = value;
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.