Skip to content

Instantly share code, notes, and snippets.

@RxDave
Last active August 29, 2015 14:11
Show Gist options
  • Save RxDave/d9eba10e0d9e51a968a7 to your computer and use it in GitHub Desktop.
Save RxDave/d9eba10e0d9e51a968a7 to your computer and use it in GitHub Desktop.
CurrentThreadScheduler allows cancellation of recursive or iterative observables without concurrency.
using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
namespace ConsoleApplication1
{
class Program
{
static void Main()
{
using (Observable.Range(0, 100, CurrentThreadScheduler.Instance)
.Do(x => Console.WriteLine("Generated: {0}", x))
.Take(10)
.Subscribe(x => Console.WriteLine("Observed: {0}", x)))
{
Console.ReadKey();
}
}
}
}
@RxDave
Copy link
Author

RxDave commented Dec 10, 2014

Output:

Generated: 0
Observed: 0
Generated: 1
Observed: 1
Generated: 2
Observed: 2
Generated: 3
Observed: 3
Generated: 4
Observed: 4
Generated: 5
Observed: 5
Generated: 6
Observed: 6
Generated: 7
Observed: 7
Generated: 8
Observed: 8
Generated: 9
Observed: 9

@Igorbek
Copy link

Igorbek commented Jan 8, 2015

Try this:

Observable.Range(0, 100, CurrentThreadScheduler.Instance)
.Do(x => Console.WriteLine("Generated: {0}", x))
.Take(10)
.Subscribe(x => Console.WriteLine("Observed: {0}", x))
.Dispose();

it still output 10 values, so there wasn't a chance to dispose it.
I found it works due internal Sink usage. I couldn't implement my own Take without it.
Can you implement Take() without Rx's internals?

@Igorbek
Copy link

Igorbek commented Jan 8, 2015

Sorry, my bad. I understand the reason. Forget my question =)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment