Skip to content

Instantly share code, notes, and snippets.

@RxDave
Last active April 19, 2017 22:14
Show Gist options
  • Save RxDave/6c18831a0014ab04806efacace022aea to your computer and use it in GitHub Desktop.
Save RxDave/6c18831a0014ab04806efacace022aea to your computer and use it in GitHub Desktop.
Conflating IObserver with IDisposable
using System;
using System.Linq;
namespace Lab
{
// THIS PROGRAM DOES NOT REQUIRE A REFERENCE TO RX
class Program
{
static void Main()
{
var source = new Subject<int>();
var o = source.Filter(i => i % 2 == 0)
.Map(i => i * 5);
var disposable = o.Subscribe(new ConsoleObserver<int>("i % 2 * 5"));
foreach (var i in Enumerable.Range(1, 10))
{
source.OnNext(i);
}
Console.WriteLine();
Console.WriteLine("Disposing subscription.");
disposable.Dispose();
Console.WriteLine();
Console.WriteLine("Pushing another value.");
source.OnNext(12345);
Console.WriteLine("Done.");
Console.ReadLine();
}
}
}
i % 2 * 5 Next: 10
i % 2 * 5 Next: 20
i % 2 * 5 Next: 30
i % 2 * 5 Next: 40
i % 2 * 5 Next: 50
Disposing subscription.
Disposed: Lab.Filter`1[System.Int32]
Disposed: Lab.Map`2[System.Int32,System.Int32]
Pushing another value.
Done.
using System;
using System.Collections.Generic;
namespace Lab
{
public static class ObservableExperiment
{
public static IObservable<U> Map<T, U>(this IObservable<T> source, Func<T, U> selector)
=> new AnonymousObservable<U>(o => source.Subscribe(new Map<T, U>(selector, o)));
public static IObservable<T> Filter<T>(this IObservable<T> source, Func<T, bool> predicate)
=> new AnonymousObservable<T>(o => source.Subscribe(new Filter<T>(predicate, o)));
}
internal sealed class Map<T, U> : Sink<T>
{
private readonly Func<T, U> selector;
private readonly IObserver<U> observer;
public Map(Func<T, U> selector, IObserver<U> observer)
: base(observer as IDisposable)
{
this.selector = selector;
this.observer = observer;
}
protected override void OnNextCore(T value) => observer.OnNext(selector(value));
protected override void OnErrorCore(Exception error) => observer.OnError(error);
protected override void OnCompletedCore() => observer.OnCompleted();
}
internal sealed class Filter<T> : Sink<T>
{
private readonly Func<T, bool> predicate;
private readonly IObserver<T> observer;
public Filter(Func<T, bool> predicate, IObserver<T> observer)
: base(observer as IDisposable)
{
this.predicate = predicate;
this.observer = observer;
}
protected override void OnNextCore(T value) { if (predicate(value)) observer.OnNext(value); }
protected override void OnErrorCore(Exception error) => observer.OnError(error);
protected override void OnCompletedCore() => observer.OnCompleted();
}
public abstract class Sink<T> : IObserver<T>, IDisposable
{
private readonly IDisposable disposable;
private volatile bool terminated;
public Sink(IDisposable disposable) => this.disposable = disposable;
public void OnNext(T value) { if (!terminated) OnNextCore(value); }
public void OnError(Exception error) { if (!terminated) OnErrorCore(error); }
public void OnCompleted() { if (!terminated) OnCompletedCore(); }
protected abstract void OnNextCore(T value);
protected abstract void OnErrorCore(Exception error);
protected abstract void OnCompletedCore();
public virtual void Dispose()
{
terminated = true;
Console.WriteLine("Disposed: " + GetType());
disposable?.Dispose();
}
}
public sealed class ConsoleObserver<T> : IObserver<T>
{
private readonly string label;
public ConsoleObserver(string label = null) => this.label = label;
public void OnNext(T value) => Console.WriteLine(label + " Next: " + value);
public void OnError(Exception error) => Console.WriteLine(label + " Error: " + error);
public void OnCompleted() => Console.WriteLine(label + " Completed");
}
internal sealed class AnonymousObservable<T> : IObservable<T>
{
private readonly Func<IObserver<T>, IDisposable> subscribe;
public AnonymousObservable(Func<IObserver<T>, IDisposable> subscribe) => this.subscribe = subscribe;
public IDisposable Subscribe(IObserver<T> observer) => subscribe(observer);
}
public sealed class Subject<T> : IObservable<T>, IObserver<T>
{
private readonly List<Subscription> observers = new List<Subscription>();
public IDisposable Subscribe(IObserver<T> observer)
{
var s = new Subscription(observer);
observers.Add(s);
return s;
}
public void OnNext(T value)
{
foreach (var observer in observers)
{
observer.OnNext(value);
}
}
public void OnError(Exception error)
{
foreach (var observer in observers)
{
observer.OnError(error);
}
}
public void OnCompleted()
{
foreach (var observer in observers)
{
observer.OnCompleted();
}
}
private sealed class Subscription : IObserver<T>, IDisposable
{
private IObserver<T> observer;
public Subscription(IObserver<T> observer) => this.observer = observer;
public void OnNext(T value) => observer?.OnNext(value);
public void OnError(Exception error) => observer?.OnError(error);
public void OnCompleted() => observer?.OnCompleted();
public void Dispose()
{
var o = observer;
observer = null;
(o as IDisposable)?.Dispose();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment