Skip to content

Instantly share code, notes, and snippets.

@Fijo
Created July 16, 2022 16:02
Show Gist options
  • Save Fijo/419c83694a359cba8ddd9867d47f6ef7 to your computer and use it in GitHub Desktop.
Save Fijo/419c83694a359cba8ddd9867d47f6ef7 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.ComponentModel;
namespace CosmicShores.Core.Rx {
public static class ObservableExtensions {
// PERF can be improved
// TODO propose integration this into System.Reactive itself. (https://github.com/dotnet/reactive/issues/1791#issuecomment-1186218797)
public static IObservable<IList<TSource>> CombineLatest<TSource>(this IEnumerable<IObservable<TSource>> sources, SourcelessBehaviour noSourceBehaviour) {
switch (noSourceBehaviour) {
case SourcelessBehaviour.SingleEmptyArray:
if (sources is null)
throw new ArgumentNullException(nameof(sources));
// wrapping within Observable.Create so the IEnumerable isn't executed earlier than it normally would be.
return Observable.Create<IList<TSource>>(SingleEmptyArrayBehaviourSubscribe);
case SourcelessBehaviour.Default:
return sources.CombineLatest();
default:
throw new InvalidEnumArgumentException(nameof(noSourceBehaviour), (int)noSourceBehaviour, typeof(SourcelessBehaviour));
}
IDisposable SingleEmptyArrayBehaviourSubscribe(IObserver<IList<TSource>> observer) {
var sourceArray = sources.ToArray();
var observable = sourceArray.Length == 0
? Observable.Return(Array.Empty<TSource>())
: sourceArray.CombineLatest();
return observable.Subscribe(observer);
}
}
// PERF can be improved
public static IObservable<TResult> CombineLatest<TSource, TResult>(this IEnumerable<IObservable<TSource>> sources, Func<IList<TSource>, TResult> resultSelector, SourcelessBehaviour noSourceBehaviour) {
if (resultSelector is null) throw new ArgumentNullException(nameof(resultSelector));
return sources.CombineLatest(noSourceBehaviour).Select(resultSelector);
}
}
}
namespace CosmicShores.Core.Rx {
/// <summary>
/// Behaviour for emiting when there are no sources.
/// </summary>
public enum SourcelessBehaviour : byte {
/// <summary>
/// Never emits (as of System.Reactive 5.0)
/// </summary>
Default = 0,
SingleEmptyArray = 1,
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment