Skip to content

Instantly share code, notes, and snippets.

@mharju
Created January 13, 2012 10:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mharju/1605373 to your computer and use it in GitHub Desktop.
Save mharju/1605373 to your computer and use it in GitHub Desktop.
tokenize input and subscribe. Instant profit when combined with JSON object async response stuff.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Subjects;
namespace ReactiveTest
{
static class ReactiveExtension
{
public static IDisposable SubscribeOnKey(this IObservable<KeyValuePair<int, string>> source, int key, Action<string> subscribe)
{
return source.Where(x => x.Key == key).Select(x => x.Value).Subscribe(subscribe);
}
}
class Program
{
static void Main(string[] args)
{
var src = GetInput().ToObservable(Scheduler.NewThread);
var res = from s in src group s by s.Length;
Subject<KeyValuePair<int, string>> streamsByKey = new Subject<KeyValuePair<int,string>>();
res.Subscribe(x => {
x.Subscribe(x_ => { streamsByKey.OnNext(new KeyValuePair<int, string>(x.Key, x_)); });
});
streamsByKey.SubscribeOnKey(5, x => Console.WriteLine("Profit with " + x));
streamsByKey.SubscribeOnKey(7, x => Console.WriteLine("Also we has " + x));
streamsByKey.SubscribeOnKey(9, x => Console.WriteLine("Als... shit: " + x));
}
static IEnumerable<string> GetInput()
{
while(true) yield return Console.ReadLine();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment