Skip to content

Instantly share code, notes, and snippets.

@oillio
Created March 1, 2011 00:33
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save oillio/848367 to your computer and use it in GitHub Desktop.
Save oillio/848367 to your computer and use it in GitHub Desktop.
public static IObservable<WState> WindowOpensAndCloses<T>(this IObservable<IObservable<T>> This, bool verify_exclusive = true) {
var state = WState.Close;
var ret = Observable.Merge(This.Select(_ => WState.Open),
This.SelectMany(x => x.Materialize())
.Where(x => x.Kind == NotificationKind.OnCompleted ||
x.Kind == NotificationKind.OnError)
.Select(_ => WState.Close));
if(verify_exclusive) return ret.Do(x =>
{
if (state == x) throw new Exception("A non-exclusive window used where it shouldn't be.");
state = x;
});
else return ret;
}
public enum WState {
Open,
Close
}
public static IObservable<IObservable<T>> Union<T>(this IObservable<IObservable<T>> This) {
return Observable.CreateWithDisposable<IObservable<T>>(observer =>
{
Subject<T> window = null;
IDisposable window_subscription = null;
return This.WindowOpensAndCloses(false)
.Select(x => (x == WState.Open) ? 1 : -1)
.Scan((acc, val) => acc + val)
.Subscribe(count =>
{
if (window == null) {
if (count == 1) {
window = new Subject<T>();
window_subscription = This.Merge().Subscribe(window);
observer.OnNext(window);
}
}
else {
if (count == 0) {
window.OnCompleted();
window_subscription.Dispose();
window = null;
}
}
});
});
}
public static IObservable<IObservable<T>> Not<T>(this IObservable<IObservable<T>> This) {
return Observable.CreateWithDisposable<IObservable<T>>(observer =>
{
Subject<T> window = new Subject<T>();
observer.OnNext(window);
return This.WindowOpensAndCloses()
.Subscribe(x =>
{
if (x == WState.Open) {
window.OnCompleted();
window = null;
}
else {
window = new Subject<T>();
observer.OnNext(window);
}
});
});
}
public static IObservable<IObservable<T>> And<T>(this IObservable<IObservable<T>> A, IObservable<IObservable<T>> B) {
return Observable.CreateWithDisposable<IObservable<T>>(observer =>
{
var openCount = 0;
Subject<T> window = null;
IDisposable window_subscription = null;
return Observable.Merge(A.WindowOpensAndCloses(),
B.WindowOpensAndCloses())
.Subscribe(x =>
{
if (x == WState.Open) {
openCount++;
if (openCount == 2) {
window = new Subject<T>();
window_subscription = A.Merge()
.Merge(B.Merge())
.Subscribe(window);
observer.OnNext(window);
}
}
else {
openCount--;
if (window != null) {
window.OnCompleted();
window_subscription.Dispose();
window = null;
}
}
});
});
}
public static IObservable<IObservable<T>> Or<T>(this IObservable<IObservable<T>> A, IObservable<IObservable<T>> B) {
return Observable.CreateWithDisposable<IObservable<T>>(observer =>
{
var openCount = 0;
Subject<T> window = null;
IDisposable window_subscription = null;
return Observable.Merge(A.WindowOpensAndCloses(),
B.WindowOpensAndCloses())
.Subscribe(x =>
{
if (x == WState.Open) {
openCount++;
if (window == null) {
window = new Subject<T>();
window_subscription = A.Merge()
.Merge(B.Merge())
.Subscribe(window);
observer.OnNext(window);
}
}
else {
openCount--;
if (openCount == 0) {
window.OnCompleted();
window_subscription.Dispose();
window = null;
}
}
});
});
}
public static IObservable<IObservable<T>> MapToWindow<T,T2>(this IObservable<T> source, IObservable<IObservable<T2>> windows) {
return Observable.CreateWithDisposable<IObservable<T>>(observer =>
{
var gate = new object();
Subject<T> outWindow = null;
var Buffer = new List<T>();
var ret = new CompositeDisposable();
ret.Add(source.Subscribe(x =>
{
lock (gate) {
if (outWindow != null) outWindow.OnNext(x);
else Buffer.Add(x);
}
}));
ret.Add(windows.WindowOpensAndCloses()
.Subscribe(x =>
{
if(x == WState.Open) {
lock (gate) {
outWindow = new Subject<T>();
observer.OnNext(Buffer.ToObservable().Concat(outWindow));
Buffer = null;
}
}
else {
if (outWindow != null) {
lock (gate) {
outWindow.OnCompleted();
outWindow = null;
Buffer = new List<T>();
}
}
}
}));
return ret;
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment