-
-
Save heltonbiker/2075c1ba089f695b9f9294c9bc551e66 to your computer and use it in GitHub Desktop.
using System; | |
using System.Linq; | |
using System.Reactive.Linq; | |
using System.Reactive.Subjects; | |
namespace UnzipRx | |
{ | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
var source = new Subject<int>(); | |
int count = 3; | |
source.Select((t, i) => Tuple.Create(t, i)) | |
.GroupBy(t => t.Item2 % count) | |
.Select(g => g.Select(t => t.Item1)) | |
.Do(s => s.Subscribe(Console.WriteLine)); | |
Observable.Range(0, 20).Subscribe(source); | |
Console.ReadKey(); | |
} | |
} | |
} |
Hi, Lee, thanks for your support! I am almost done reading your book, it's excellent reading!
Respective to your suggested modification, it worked, but I only got "1 2 3 4 5 .... 20" printed to console, one per line. What I need is a way to isolate individual sequences, so that I can subscribe to them individually. Just as an example (I don't know how silly it could be, but it captures the intended semantics):
IObserver<double> observers = GetThreeObservers();
IObservable<int>[] sources = Distribute(Observable.Range(0,20), 3);
secondSource = sources[1];
secondObserver = observers[1];
secondSource.Subscribe(secondObserver);
Output would be:
1
4
7
10
13
16
19
How could I achieve a similar effect, that is, be able to watch individual, unmerged sequences?
I tried this (clumsy, I know), but without success:
class Program
{
static void Main(string[] args)
{
var source = new Subject<int>();
int count = 3;
List<IObservable<int>> sources = new List<IObservable<int>>();
source.Select((t, i) => Tuple.Create(t, i))
.GroupBy(t => t.Item2 % count)
.Select(g => g.Select(t => t.Item1))
.Do(sources.Add)
.Subscribe();
Observable.Range(0, 20).Subscribe(source); // First
sources[0].Subscribe(Console.WriteLine); // Second
// if I exchange First and Second commented lines above, I get an error because sources[0] tries to fetch an element from an empty list.
Console.ReadKey();
}
}
Also, regarding legitimate vs pointless use of subject, currently I am using it in the following class, according to this advice:
public abstract class Sensor
{
public virtual IObservable<double> WhenNewSamples
{
get { return _new_samples.AsObservable(); }
}
protected Subject<double> _new_samples = new Subject<double>();
internal void AddSamplesInternal(IEnumerable<double> samples)
{
foreach (var sample in samples)
_new_samples.OnNext(sample);
}
}
It's only purpose is to "enter the monad", that is, go from IEnumerable<double>
, via sequential AddSamplesInternal()
calls, to IObservable<double>
via the subscribable WhenNewSamples
property.
What led me to think this use is legitimate is the following part of the advice:
Why should I use a subject in that case?
Because you've got no choice!
So I wonder if I should worry about this Subject, or not.
Hey there, saw your post on http://stackoverflow.com/questions/41613027/distributing-a-sequence-among-other-sequences. This sample is showing a bit of a misunderstanding of Rx and Linq.
Just like
Enumerable.Range(1,10).Select(i=>i.ToString())
wouldn't be run if you never iterate the sequence (like with a for loop), your query above will never run as you never subscribe to it. You have just declared a queryand then not assigned to anything or subscribed to it. It is dead code.
Also, as the unofficial head of the "Just Say No to Subjects Society", I wanted to point out to you that your Subject usage is pointless, and most likely adding more confusion and friction to you learning Rx. Just stop using them.
What I think you were trying to do is