Created
April 28, 2015 10:11
-
-
Save liammclennan/c8c23313da3da1ab9e4e to your computer and use it in GitHub Desktop.
Observable Stream Processing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
void Main() | |
{ | |
Topology.Run(new RxDelta(), | |
Steps.CountUp, | |
Steps.CountDown, | |
Steps.Mod7, | |
Steps.Product, | |
Steps.SlidingAverage, | |
Steps.Printer | |
); | |
Topology.Run(new RedisDelta("192.168.85.128"), | |
Steps.CountUp, | |
Steps.CountDown, | |
Steps.Mod7, | |
Steps.Product, | |
Steps.SlidingAverage, | |
Steps.Printer | |
); | |
} | |
public static class Steps { | |
public static void CountUp(IDelta delta) { | |
delta.Register("counter", Observable.Interval(TimeSpan.FromMilliseconds(2))); | |
} | |
public static void CountDown(IDelta delta) { | |
delta.Register("count-down", | |
Observable.Interval(TimeSpan.FromMilliseconds(200)) | |
.Select(i => 10000 - i)); | |
} | |
public static void Mod7(IDelta source) { | |
source.Register("mod7", | |
source.Get<long>("counter").Select(c => c % 7)); | |
} | |
public static void Product(IDelta delta) { | |
var mod7 = delta.Get<long>("mod7"); | |
var countDown = delta.Get<long>("count-down"); | |
delta.Register("product", mod7.Zip(countDown, (s,cd) => s * cd)); | |
} | |
public static void SlidingAverage(IDelta source) { | |
source.Register("sliding-average", | |
source.Get<long>("product") | |
.Buffer(TimeSpan.FromSeconds(1)) | |
.Select(xs => xs.Average())); | |
} | |
public static void Printer(IDelta delta) { | |
var sums = delta.Get<double>("sliding-average"); | |
sums.Subscribe(i=>i.Dump()); | |
} | |
} | |
public static class Topology | |
{ | |
public static void Run(IDelta delta, params Action<IDelta>[] steps) { | |
foreach (var step in steps) { | |
step(delta); | |
} | |
} | |
} | |
public interface IDelta { | |
void Register<T>(string stream, IObservable<T> source); | |
IObservable<T> Get<T>(string stream); | |
} | |
public class RxDelta : IDelta { | |
private Dictionary<string,object> rxs = new Dictionary<string,object>(); | |
public void Register<T>(string stream, IObservable<T> source) | |
{ | |
rxs[stream] = source; | |
} | |
public IObservable<T> Get<T>(string stream) { | |
return (IObservable<T>)rxs[stream]; | |
} | |
} | |
public class RedisDelta : IDelta { | |
readonly ISubscriber sub; | |
public RedisDelta(string redisAddress) { | |
ConnectionMultiplexer redis = ConnectionMultiplexer.Connect(redisAddress); | |
IDatabase db = redis.GetDatabase(); | |
sub = redis.GetSubscriber(); | |
} | |
public void Register<T>(string stream, IObservable<T> source) | |
{ | |
source.Subscribe(o => sub.Publish(stream, JsonConvert.SerializeObject(o))); | |
} | |
public IObservable<T> Get<T>(string stream) { | |
return Observable.Create((IObserver<T> o) => { | |
sub.Subscribe(stream, (channel,message)=> { | |
o.OnNext(JsonConvert.DeserializeObject<T>((string)message)); | |
}); | |
return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed")); | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Output (note that in memory and redis output match (mostly)):
14996.5
14996.5
31983
31983
39954
39954
19966
19966
27943.4
27943.4
35907.2
35907.2
29905
29905
23914.4
23914.4
31871
31871
39814
39814
19896
19896
27845.4
27845.4
35781.2
35781.2
29800
29800
23830.4
23830.4
31759
31759
39674
39674
19826
19826
27747.4
27747.4
35655.2
35655.2
29695
29695
23746.4
23746.4