Last active
July 8, 2016 16:37
-
-
Save aalmada/a3aee3c7a1dc9f4853652eae287efee6 to your computer and use it in GitHub Desktop.
A Gate operator that outputs the latest input value when triggered by another observable. This example shows two consumers (faster and slower than producer) trigger the gate when done with the previous value.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// This example is based on the Gate operator from https://bitbucket.org/horizongir/bonsai | |
using System; | |
using System.Reactive; | |
using System.Reactive.Concurrency; | |
using System.Reactive.Linq; | |
using System.Reactive.Subjects; | |
namespace ConsoleApplication1 | |
{ | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
var source = Observable.Interval(TimeSpan.FromSeconds(1)) | |
.Timestamp() | |
.Publish() // make observable hot | |
.RefCount(); | |
// a fast consumer | |
GatedDummyConsumer(source, TimeSpan.FromSeconds(.5), "- fast").Subscribe(); // faster than producer | |
// a slow consumer | |
GatedDummyConsumer(source, TimeSpan.FromSeconds(4), "* slow").Subscribe(); // slower than producer | |
Console.Read(); | |
} | |
static IObservable<TSource> GatedDummyConsumer<TSource>(IObservable<TSource> source, TimeSpan span, string id) | |
{ | |
var trigger = new Subject<Unit>(); // used to allow a loop | |
return source | |
.Gate(trigger.StartWith(Unit.Default)) // use a gate and open it for the first value | |
.DummyConsumer(span, id) // consume the value | |
.Do(_ => trigger.OnNext(Unit.Default)); // value consumed, open the gate for a new value | |
} | |
} | |
static partial class ObservableExtensions | |
{ | |
/// <summary> | |
/// Outputs the latest input value when triggered by another observable. | |
/// </summary> | |
/// <typeparam name="TSource">The value type for the source observable.</typeparam> | |
/// <typeparam name="TTrigger">The value type for the trigger observable.</typeparam> | |
/// <param name="source">The source observable.</param> | |
/// <param name="trigger">The trigger observable.</param> | |
/// <returns>An observable that output the latest input value when the trigger output a value.</returns> | |
public static IObservable<TSource> Gate<TSource, TTrigger>(this IObservable<TSource> source, IObservable<TTrigger> trigger) | |
{ | |
return source | |
.Window(trigger) | |
.SelectMany(window => window.Take(1)); | |
} | |
public static IObservable<TSource> DummyConsumer<TSource>(this IObservable<TSource> source, TimeSpan span, string id) | |
{ | |
return source | |
.Do(value => Console.WriteLine($"{id}: {value}")) // output to console | |
.Delay(span); // simulate heavy computing | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The output for this example: