Skip to content

Instantly share code, notes, and snippets.

@kstrauss
Created June 19, 2018 14:38
Show Gist options
  • Save kstrauss/99362404f0afff6e461f2ca79525f972 to your computer and use it in GitHub Desktop.
Save kstrauss/99362404f0afff6e461f2ca79525f972 to your computer and use it in GitHub Desktop.
returns an observable that provides the first in a window and lets you know when the window closes and what was in the window
void Main()
{
var closer = new Subject<Tuple<WindowState, IList<long>>>();
closer.Select(c => {
if (c.Item1 == WindowState.Open)
{
return $"Should email, with an event of {c.Item2.Single()}";
}
else
{
return $"should do nothing but will write a checkpoint for {c.Item2.Max()}, because we had {String.Join(",", c.Item2.Select(i=>i.ToString()))}";
}
}).Timestamp().Dump();
var outer = Observer.Create<IObservable<long>>(o =>
{
Console.WriteLine("Got start of window");
var ws = new WindowInfo();
//closer.OnNext(Tuple.Create<WindowState, IList<long>>(WindowState.Open, new List<long>()));
var inner = Observer.Create<long>(ob => {
ws.Add(ob);
if (ws.GetInternal().Count() <=1) // only want to signal on the first one
closer.OnNext( Tuple.Create<WindowState, IList<long>>(WindowState.Open, new List<long>() { ob}));
}, e1 => Console.Error.WriteLine("Inner Exception"),
// on complete()
() => {
Console.WriteLine("Closed Window");
var closerValue = Tuple.Create<WindowState,IList<long>>(WindowState.Closed, ws.GetInternal());
closer.OnNext(closerValue);
});
var innerSub = o.Subscribe(inner);
}, e=>Console.Error.WriteLine("Got Exception"), ()=>Console.WriteLine("Done"));
var timer = Observable.Interval(TimeSpan.FromSeconds(1))
.Do(o => Console.WriteLine("new value in window")) // just for debugging
.Window(5)
.Subscribe(outer);
Console.ReadLine();
}
public enum WindowState{
Open,
Closed
}
public class WindowInfo{
protected List<long> internalValues = new List<long>();
public void Add(long l){
internalValues.Add(l);
}
public IList<long> GetInternal(){
return internalValues;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment