Created
January 21, 2013 19:58
-
-
Save PeteGoo/4588794 to your computer and use it in GitHub Desktop.
Demonstrates using Rx to turn an incoming event stream into a grouped set of event streams based on the content. This time we use Throttle to have those groups clean themselves up (Complete) when there is a period of inactivity. Renewed activity on a cleaned up group will create a new group.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Linq; | |
using System.Reactive.Linq; | |
namespace RxGroupByUntil { | |
class Program { | |
static void Main(string[] args) { | |
PrintSequences(); | |
Console.ReadLine(); | |
} | |
public static void PrintSequences() { | |
var obsX = GetSequence("X", 0.1); | |
var obsY = GetSequence("Y", 0.5); | |
obsX.Merge(obsY) | |
.GroupByUntil(i => i, g => g.Throttle(TimeSpan.FromSeconds(3)).Take(1)) // The throttle will complete our group on inactivity | |
.Do(g=> g.Subscribe(Console.WriteLine, () => Console.WriteLine("Completed a group"))) // This is just for logging each value and when a group is completed | |
.SelectMany(group => group.Buffer(TimeSpan.FromSeconds(6), 500)) // Buffer the results | |
.Where(group => group.Any()) | |
.Subscribe(buffer => Console.WriteLine("{0} {1}'s", buffer.Count(), buffer.First())); // Write out the summary of each buffer | |
} | |
/// <summary> | |
/// Creates a sequence of event such that, if content is "X" the event stream will be | |
/// two sequences of X's every second separated by a 5 second pause i.e. | |
/// X (1 sec) X (1 sec) X (1 sec) X (1 sec) X (5 sec) X (1 sec) X (1 sec) X (1 sec) X (1 sec) X | |
/// </summary> | |
public static IObservable<string> GetSequence(string content, double leadingPause) { | |
return Observable.Interval(TimeSpan.FromSeconds(leadingPause)).Select(x => content).Take(1).IgnoreElements().Concat( | |
Observable.Interval(TimeSpan.FromSeconds(1)).Take(5).Select(x => content).Concat( | |
Observable.Interval(TimeSpan.FromSeconds(5)).Select(x => content).Take(1).IgnoreElements()).Concat( | |
Observable.Interval(TimeSpan.FromSeconds(1)).Take(5).Select(x => content))); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
X | |
Y | |
X | |
Y | |
X | |
Y | |
X | |
Y | |
X | |
Y | |
5 X's | |
5 Y's | |
Completed a group | |
Completed a group | |
X | |
Y | |
X | |
Y | |
X | |
Y | |
X | |
Y | |
X | |
Y | |
Completed a group | |
5 Y's | |
Completed a group | |
5 X's |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment