Skip to content

Instantly share code, notes, and snippets.

@heltonbiker
Created January 12, 2017 19:15
Show Gist options
  • Save heltonbiker/2075c1ba089f695b9f9294c9bc551e66 to your computer and use it in GitHub Desktop.
Save heltonbiker/2075c1ba089f695b9f9294c9bc551e66 to your computer and use it in GitHub Desktop.
Method that aims to distribute a sequence amongst N other sequences, cyclically, like dealing cards.
using System;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
namespace UnzipRx
{
class Program
{
static void Main(string[] args)
{
var source = new Subject<int>();
int count = 3;
source.Select((t, i) => Tuple.Create(t, i))
.GroupBy(t => t.Item2 % count)
.Select(g => g.Select(t => t.Item1))
.Do(s => s.Subscribe(Console.WriteLine));
Observable.Range(0, 20).Subscribe(source);
Console.ReadKey();
}
}
}
@heltonbiker
Copy link
Author

heltonbiker commented Jan 13, 2017

Hi, Lee, thanks for your support! I am almost done reading your book, it's excellent reading!

Respective to your suggested modification, it worked, but I only got "1 2 3 4 5 .... 20" printed to console, one per line. What I need is a way to isolate individual sequences, so that I can subscribe to them individually. Just as an example (I don't know how silly it could be, but it captures the intended semantics):

IObserver<double> observers = GetThreeObservers();

IObservable<int>[] sources = Distribute(Observable.Range(0,20), 3);
secondSource = sources[1];
secondObserver = observers[1];
secondSource.Subscribe(secondObserver);

Output would be:

1
4
7
10
13
16
19

How could I achieve a similar effect, that is, be able to watch individual, unmerged sequences?

I tried this (clumsy, I know), but without success:

class Program
{
	static void Main(string[] args)
	{
		var source = new Subject<int>();

		int count = 3;

		List<IObservable<int>> sources = new List<IObservable<int>>();

		source.Select((t, i) => Tuple.Create(t, i))
							.GroupBy(t => t.Item2 % count)
							.Select(g => g.Select(t => t.Item1))
							.Do(sources.Add)
							.Subscribe();

		Observable.Range(0, 20).Subscribe(source); // First

		sources[0].Subscribe(Console.WriteLine);  // Second

                    // if I exchange First and Second commented lines above, I get an error because sources[0] tries to fetch an element from an empty list.

		Console.ReadKey();
	}
}

@heltonbiker
Copy link
Author

Also, regarding legitimate vs pointless use of subject, currently I am using it in the following class, according to this advice:

public abstract class Sensor
{
    public virtual IObservable<double> WhenNewSamples
    {
        get { return _new_samples.AsObservable(); }
    }
    protected Subject<double> _new_samples = new Subject<double>();

    internal void AddSamplesInternal(IEnumerable<double> samples)
    {
        foreach (var sample in samples)
            _new_samples.OnNext(sample);
    }
}

It's only purpose is to "enter the monad", that is, go from IEnumerable<double>, via sequential AddSamplesInternal() calls, to IObservable<double> via the subscribable WhenNewSamples property.

What led me to think this use is legitimate is the following part of the advice:

Why should I use a subject in that case?

Because you've got no choice!

So I wonder if I should worry about this Subject, or not.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment