Skip to content

Instantly share code, notes, and snippets.

@aalmada
Last active July 8, 2016 16:37
Show Gist options
  • Save aalmada/a3aee3c7a1dc9f4853652eae287efee6 to your computer and use it in GitHub Desktop.
Save aalmada/a3aee3c7a1dc9f4853652eae287efee6 to your computer and use it in GitHub Desktop.
A Gate operator that outputs the latest input value when triggered by another observable. This example shows two consumers (faster and slower than producer) trigger the gate when done with the previous value.
// This example is based on the Gate operator from https://bitbucket.org/horizongir/bonsai
using System;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
namespace ConsoleApplication1
{
class Program
{
static void Main(string[] args)
{
var source = Observable.Interval(TimeSpan.FromSeconds(1))
.Timestamp()
.Publish() // make observable hot
.RefCount();
// a fast consumer
GatedDummyConsumer(source, TimeSpan.FromSeconds(.5), "- fast").Subscribe(); // faster than producer
// a slow consumer
GatedDummyConsumer(source, TimeSpan.FromSeconds(4), "* slow").Subscribe(); // slower than producer
Console.Read();
}
static IObservable<TSource> GatedDummyConsumer<TSource>(IObservable<TSource> source, TimeSpan span, string id)
{
var trigger = new Subject<Unit>(); // used to allow a loop
return source
.Gate(trigger.StartWith(Unit.Default)) // use a gate and open it for the first value
.DummyConsumer(span, id) // consume the value
.Do(_ => trigger.OnNext(Unit.Default)); // value consumed, open the gate for a new value
}
}
static partial class ObservableExtensions
{
/// <summary>
/// Outputs the latest input value when triggered by another observable.
/// </summary>
/// <typeparam name="TSource">The value type for the source observable.</typeparam>
/// <typeparam name="TTrigger">The value type for the trigger observable.</typeparam>
/// <param name="source">The source observable.</param>
/// <param name="trigger">The trigger observable.</param>
/// <returns>An observable that output the latest input value when the trigger output a value.</returns>
public static IObservable<TSource> Gate<TSource, TTrigger>(this IObservable<TSource> source, IObservable<TTrigger> trigger)
{
return source
.Window(trigger)
.SelectMany(window => window.Take(1));
}
public static IObservable<TSource> DummyConsumer<TSource>(this IObservable<TSource> source, TimeSpan span, string id)
{
return source
.Do(value => Console.WriteLine($"{id}: {value}")) // output to console
.Delay(span); // simulate heavy computing
}
}
}
@aalmada
Copy link
Author

aalmada commented Jul 8, 2016

The output for this example:

- fast: 0@7/8/2016 4:36:11 PM +00:00
* slow: 0@7/8/2016 4:36:11 PM +00:00
- fast: 1@7/8/2016 4:36:12 PM +00:00
- fast: 2@7/8/2016 4:36:13 PM +00:00
- fast: 3@7/8/2016 4:36:14 PM +00:00
- fast: 4@7/8/2016 4:36:15 PM +00:00
* slow: 4@7/8/2016 4:36:15 PM +00:00
- fast: 5@7/8/2016 4:36:16 PM +00:00
- fast: 6@7/8/2016 4:36:17 PM +00:00
- fast: 7@7/8/2016 4:36:18 PM +00:00
- fast: 8@7/8/2016 4:36:19 PM +00:00
* slow: 8@7/8/2016 4:36:19 PM +00:00
- fast: 9@7/8/2016 4:36:20 PM +00:00
- fast: 10@7/8/2016 4:36:21 PM +00:00
- fast: 11@7/8/2016 4:36:22 PM +00:00
- fast: 12@7/8/2016 4:36:23 PM +00:00
* slow: 12@7/8/2016 4:36:23 PM +00:00
- fast: 13@7/8/2016 4:36:24 PM +00:00
- fast: 14@7/8/2016 4:36:25 PM +00:00
- fast: 15@7/8/2016 4:36:26 PM +00:00
- fast: 16@7/8/2016 4:36:27 PM +00:00
* slow: 16@7/8/2016 4:36:27 PM +00:00
- fast: 17@7/8/2016 4:36:28 PM +00:00
- fast: 18@7/8/2016 4:36:29 PM +00:00
- fast: 19@7/8/2016 4:36:30 PM +00:00
- fast: 20@7/8/2016 4:36:31 PM +00:00
* slow: 20@7/8/2016 4:36:31 PM +00:00
- fast: 21@7/8/2016 4:36:32 PM +00:00

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment