Skip to content

Instantly share code, notes, and snippets.

@PeteGoo
Created January 21, 2013 19:58
Show Gist options
  • Save PeteGoo/4588794 to your computer and use it in GitHub Desktop.
Save PeteGoo/4588794 to your computer and use it in GitHub Desktop.
Demonstrates using Rx to turn an incoming event stream into a grouped set of event streams based on the content. This time we use Throttle to have those groups clean themselves up (Complete) when there is a period of inactivity. Renewed activity on a cleaned up group will create a new group.
using System;
using System.Linq;
using System.Reactive.Linq;
namespace RxGroupByUntil {
class Program {
static void Main(string[] args) {
PrintSequences();
Console.ReadLine();
}
public static void PrintSequences() {
var obsX = GetSequence("X", 0.1);
var obsY = GetSequence("Y", 0.5);
obsX.Merge(obsY)
.GroupByUntil(i => i, g => g.Throttle(TimeSpan.FromSeconds(3)).Take(1)) // The throttle will complete our group on inactivity
.Do(g=> g.Subscribe(Console.WriteLine, () => Console.WriteLine("Completed a group"))) // This is just for logging each value and when a group is completed
.SelectMany(group => group.Buffer(TimeSpan.FromSeconds(6), 500)) // Buffer the results
.Where(group => group.Any())
.Subscribe(buffer => Console.WriteLine("{0} {1}'s", buffer.Count(), buffer.First())); // Write out the summary of each buffer
}
/// <summary>
/// Creates a sequence of event such that, if content is "X" the event stream will be
/// two sequences of X's every second separated by a 5 second pause i.e.
/// X (1 sec) X (1 sec) X (1 sec) X (1 sec) X (5 sec) X (1 sec) X (1 sec) X (1 sec) X (1 sec) X
/// </summary>
public static IObservable<string> GetSequence(string content, double leadingPause) {
return Observable.Interval(TimeSpan.FromSeconds(leadingPause)).Select(x => content).Take(1).IgnoreElements().Concat(
Observable.Interval(TimeSpan.FromSeconds(1)).Take(5).Select(x => content).Concat(
Observable.Interval(TimeSpan.FromSeconds(5)).Select(x => content).Take(1).IgnoreElements()).Concat(
Observable.Interval(TimeSpan.FromSeconds(1)).Take(5).Select(x => content)));
}
}
}
X
Y
X
Y
X
Y
X
Y
X
Y
5 X's
5 Y's
Completed a group
Completed a group
X
Y
X
Y
X
Y
X
Y
X
Y
Completed a group
5 Y's
Completed a group
5 X's
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment