Skip to content

Instantly share code, notes, and snippets.

public static async Task<bool> WaitAsync(this AsyncAutoResetEvent source,
TimeSpan timeout, CancellationToken cancellationToken = default)
{
if (timeout < TimeSpan.Zero) throw new ArgumentOutOfRangeException(nameof(timeout));
if (timeout == Timeout.InfiniteTimeSpan)
{
await source.WaitAsync(cancellationToken); return true;
}
cancellationToken.ThrowIfCancellationRequested();
using System;
using System.Collections.Immutable;
using System.Diagnostics;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace CustomChannels
{
using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;
public class ReplayLimitedSubscriptionsSubject<T> : ISubject<T>
{
private readonly ISubject<T> _subject = new Subject<T>();
private readonly int _replayMaxSubscriptions;
private ReplaySubject<T> _replaySubject;
private int _subscriptionsCount = 0;
@theodorzoulias
theodorzoulias / SortedLookupDictionary.cs
Last active May 3, 2022 08:57
Read-only sorted dictionary based on a lookup table -- https://stackoverflow.com/a/70630789/11178549
using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
/// <summary>
/// Represents a read-only collection of key/value pairs that are sorted on the key.
/// </summary>
/// <remarks>
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
public static class ConcurrentDictionaryExtensions
{
/// <summary>
/// Uses the specified function to update a key/collection pair in the dictionary,
/// if the key already exists. In case the function returns a null or empty
/// <summary>
/// Represents an asynchronous operation that is invoked lazily on demand, can be
/// invoked multiple times, and is subject to a non-concurrent execution policy.
/// Concurrent observers receive the result of the same operation.
/// </summary>
public class AsyncCollapseConcurrent<TResult>
{
private readonly Func<Task<TResult>> _taskFactory;
private volatile Task<TResult> _task;
@theodorzoulias
theodorzoulias / DeferErrorUntilCompletion.cs
Last active January 25, 2023 07:02
DeferErrorUntilCompletion for asynchronous sequences
// https://stackoverflow.com/questions/73056639/how-to-chunkify-an-ienumerablet-without-losing-discarding-items-in-case-of-fa
private async static IAsyncEnumerable<TOutput> DeferErrorUntilCompletion<TInput, TOutput>(
IAsyncEnumerable<TInput> input,
Func<IAsyncEnumerable<TInput>, IAsyncEnumerable<TOutput>> conversion,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
Task errorContainer = null;
async IAsyncEnumerable<TInput> InputIterator(
[EnumeratorCancellation] CancellationToken innerToken = default)
/// <summary>
/// Invokes a delegate for a node and all its descendants in parallel.
/// Children are invoked after the completion of their parent.
/// </summary>
public static ParallelLoopResult ParallelTraverseHierarchical<TNode>(
TNode root,
ParallelOptions parallelOptions,
Action<TNode, ParallelLoopState> body,
Func<TNode, IEnumerable<TNode>> childrenSelector)
{
/// <summary>
/// Represents the result of an asynchronous operation that is invoked lazily on demand,
/// it is retried as many times as needed until it succeeds, while enforcing a non-overlapping execution policy.
/// </summary>
public class AsyncLazyRetryOnFailure<TResult>
{
private volatile State _state;
private TResult _result; // The _result is assigned only once, along with the _state becoming null.
private record class State(Func<Task<TResult>> TaskFactory, Task<TResult> Task);
/// <summary>
/// Returns an existing task from the concurrent dictionary, or adds a new task
/// using the specified asynchronous factory method. Concurrent invocations for
/// the same key are prevented, unless the task is removed before the completion
/// of the delegate. Failed tasks are evicted from the concurrent dictionary.
/// </summary>
public static ValueTask<TValue> GetOrAddAsync<TKey, TValue>(
this ConcurrentDictionary<TKey, ValueTask<TValue>> source, TKey key,
Func<TKey, Task<TValue>> valueFactory)
{