Skip to content

Instantly share code, notes, and snippets.

@cwharris
Last active December 16, 2015 22:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cwharris/5509797 to your computer and use it in GitHub Desktop.
Save cwharris/5509797 to your computer and use it in GitHub Desktop.
Reactive Primitives
using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;
namespace ReactivePrimitives
{
class Program
{
static void Main(string[] args)
{
var a = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(0.25)).ToRxInteger();
var b = 10.ToRxInteger();
var c = a + b;
c.Subscribe(Console.WriteLine);
Console.ReadLine();
}
}
public class RxBoolean: IObservable<bool>
{
private readonly IObservable<bool> source;
public RxBoolean(bool value)
{
this.source = new BehaviorSubject<bool>(value);
}
public RxBoolean(IObservable<bool> source)
{
this.source = source;
}
public IDisposable Subscribe(IObserver<bool> observer)
{
return this.source.Subscribe(observer);
}
}
public class RxInteger : IObservable<int>
{
private readonly IObservable<int> source;
public RxInteger(int value)
{
this.source = new BehaviorSubject<int>(value);
}
public RxInteger(IObservable<int> source)
{
this.source = source;
}
public static RxInteger operator +(RxInteger a, RxInteger b)
{
return new RxInteger(Observable.CombineLatest<int>(a, b).Select((x) => x[0] + x[1]));
}
public static RxInteger operator -(RxInteger a, RxInteger b)
{
return new RxInteger(Observable.CombineLatest<int>(a, b).Select((x) => x[0] = x[1]));
}
public static RxBoolean operator ==(RxInteger a, RxInteger b)
{
return new RxBoolean(Observable.CombineLatest<int>(a, b).Select((x) => x[0] == x[1]));
}
public static RxBoolean operator !=(RxInteger a, RxInteger b)
{
return new RxBoolean(Observable.CombineLatest<int>(a, b).Select((x) => x[0] != x[1]));
}
public static RxBoolean operator >(RxInteger a, RxInteger b)
{
return new RxBoolean(Observable.CombineLatest<int>(a, b).Select((x) => x[0] > x[1]));
}
public static RxBoolean operator <(RxInteger a, RxInteger b)
{
return new RxBoolean(Observable.CombineLatest<int>(a, b).Select((x) => x[0] < x[1]));
}
public static RxBoolean operator >=(RxInteger a, RxInteger b)
{
return new RxBoolean(Observable.CombineLatest<int>(a, b).Select((x) => x[0] >= x[1]));
}
public static RxBoolean operator <=(RxInteger a, RxInteger b)
{
return new RxBoolean(Observable.CombineLatest<int>(a, b).Select((x) => x[0] <= x[1]));
}
public IDisposable Subscribe(IObserver<int> observer)
{
return this.source.Subscribe(observer);
}
}
public static class RxPrimitiveHelpers
{
public static RxInteger ToRxInteger(this int value)
{
return new RxInteger(value);
}
public static RxInteger ToRxInteger(this IObservable<int> source)
{
return new RxInteger(source);
}
public static RxInteger ToRxInteger(this long value)
{
return new RxInteger((int)value);
}
public static RxInteger ToRxInteger(this IObservable<long> source)
{
return new RxInteger(source.Select(x => (int) x));
}
public static RxBoolean ToRxInteger(this bool value)
{
return new RxBoolean(value);
}
public static RxBoolean ToRxInteger(this IObservable<bool> source)
{
return new RxBoolean(source);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment