Skip to content

Instantly share code, notes, and snippets.

@AliveDevil
Created August 15, 2022 16:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save AliveDevil/fdbfcd07a740b85ef3e96f248eb1657f to your computer and use it in GitHub Desktop.
Save AliveDevil/fdbfcd07a740b85ef3e96f248eb1657f to your computer and use it in GitHub Desktop.
//
// Copyright(c) 2022 Jöran "AliveDevil" Malek
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
//
using System.Reactive.Disposables;
namespace System.Reactive.Linq;
public static class Observables
{
/// <summary>
/// Switch and DefaultIfEmpty, on steroids
/// </summary>
public static IObservable<T> SwitchWithDefault<T>(this IObservable<IObservable<T>> observable, T value = default)
=> new SwitchWithDefaultObservable<T>(observable, value);
private class SwitchWithDefaultObservable<TSource> : ObservableBase<TSource>
{
private readonly TSource defaultValue;
private readonly IObservable<IObservable<TSource>> sources;
public SwitchWithDefaultObservable(IObservable<IObservable<TSource>> sources, TSource defaultValue)
{
this.sources = sources;
this.defaultValue = defaultValue;
}
protected override IDisposable SubscribeCore(IObserver<TSource> observer)
{
var _ = new _(defaultValue, observer);
_.Run(sources);
return _;
}
private class _ : ObserverBase<IObservable<TSource>>
{
private readonly TSource defaultValue;
private readonly SerialDisposable innerSerialDisposable = new();
private readonly IObserver<TSource> observer;
private readonly SingleAssignmentDisposable upstream = new();
public _(TSource defaultValue, IObserver<TSource> observer)
{
this.defaultValue = defaultValue;
this.observer = observer;
}
public void ForwardOnCompleted()
{
observer.OnCompleted();
Dispose();
}
public void ForwardOnError(Exception error)
{
observer.OnError(error);
Dispose();
}
public void ForwardOnNext(TSource value) => observer.OnNext(value);
public void Run(IObservable<IObservable<TSource>> sources)
{
upstream.Disposable = sources.Subscribe(this);
}
protected override void Dispose(bool disposing)
{
if (disposing)
{
innerSerialDisposable?.Dispose();
}
base.Dispose(disposing);
}
protected override void OnCompletedCore()
{
observer.OnCompleted();
}
protected override void OnErrorCore(Exception error) => ForwardOnError(error);
protected override void OnNextCore(IObservable<TSource> value)
{
var innerObserver = new InnerObserver(this, defaultValue);
innerSerialDisposable.Disposable = innerObserver;
innerObserver.Subscribe(value);
}
private class InnerObserver : ObserverBase<TSource>
{
private readonly TSource defaultValue;
private readonly _ parent;
private readonly SingleAssignmentDisposable upstream = new();
public InnerObserver(_ parent, TSource defaultValue)
{
this.parent = parent;
this.defaultValue = defaultValue;
Found = false;
}
public bool Found { get; set; }
public void Subscribe(IObservable<TSource> upstream)
{
this.upstream.Disposable = upstream.SubscribeSafe(this);
if (!Found)
{
OnNext(defaultValue);
}
}
protected override void Dispose(bool disposing)
{
if (disposing)
{
upstream.Dispose();
}
base.Dispose(disposing);
}
protected override void OnCompletedCore()
{
Dispose();
if (!Found)
{
OnNextCore(defaultValue);
parent.ForwardOnCompleted();
}
}
protected override void OnErrorCore(Exception error)
{
Dispose();
if (!Found)
{
OnNextCore(defaultValue);
parent.ForwardOnError(error);
}
}
protected override void OnNextCore(TSource value)
{
Found = true;
parent.ForwardOnNext(value);
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment