Skip to content

Instantly share code, notes, and snippets.

@liammclennan
Created April 28, 2015 10:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save liammclennan/c8c23313da3da1ab9e4e to your computer and use it in GitHub Desktop.
Save liammclennan/c8c23313da3da1ab9e4e to your computer and use it in GitHub Desktop.
Observable Stream Processing
void Main()
{
Topology.Run(new RxDelta(),
Steps.CountUp,
Steps.CountDown,
Steps.Mod7,
Steps.Product,
Steps.SlidingAverage,
Steps.Printer
);
Topology.Run(new RedisDelta("192.168.85.128"),
Steps.CountUp,
Steps.CountDown,
Steps.Mod7,
Steps.Product,
Steps.SlidingAverage,
Steps.Printer
);
}
public static class Steps {
public static void CountUp(IDelta delta) {
delta.Register("counter", Observable.Interval(TimeSpan.FromMilliseconds(2)));
}
public static void CountDown(IDelta delta) {
delta.Register("count-down",
Observable.Interval(TimeSpan.FromMilliseconds(200))
.Select(i => 10000 - i));
}
public static void Mod7(IDelta source) {
source.Register("mod7",
source.Get<long>("counter").Select(c => c % 7));
}
public static void Product(IDelta delta) {
var mod7 = delta.Get<long>("mod7");
var countDown = delta.Get<long>("count-down");
delta.Register("product", mod7.Zip(countDown, (s,cd) => s * cd));
}
public static void SlidingAverage(IDelta source) {
source.Register("sliding-average",
source.Get<long>("product")
.Buffer(TimeSpan.FromSeconds(1))
.Select(xs => xs.Average()));
}
public static void Printer(IDelta delta) {
var sums = delta.Get<double>("sliding-average");
sums.Subscribe(i=>i.Dump());
}
}
public static class Topology
{
public static void Run(IDelta delta, params Action<IDelta>[] steps) {
foreach (var step in steps) {
step(delta);
}
}
}
public interface IDelta {
void Register<T>(string stream, IObservable<T> source);
IObservable<T> Get<T>(string stream);
}
public class RxDelta : IDelta {
private Dictionary<string,object> rxs = new Dictionary<string,object>();
public void Register<T>(string stream, IObservable<T> source)
{
rxs[stream] = source;
}
public IObservable<T> Get<T>(string stream) {
return (IObservable<T>)rxs[stream];
}
}
public class RedisDelta : IDelta {
readonly ISubscriber sub;
public RedisDelta(string redisAddress) {
ConnectionMultiplexer redis = ConnectionMultiplexer.Connect(redisAddress);
IDatabase db = redis.GetDatabase();
sub = redis.GetSubscriber();
}
public void Register<T>(string stream, IObservable<T> source)
{
source.Subscribe(o => sub.Publish(stream, JsonConvert.SerializeObject(o)));
}
public IObservable<T> Get<T>(string stream) {
return Observable.Create((IObserver<T> o) => {
sub.Subscribe(stream, (channel,message)=> {
o.OnNext(JsonConvert.DeserializeObject<T>((string)message));
});
return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed"));
});
}
}
@liammclennan
Copy link
Author

Output (note that in memory and redis output match (mostly)):

14996.5
14996.5
31983
31983
39954
39954
19966
19966
27943.4
27943.4
35907.2
35907.2
29905
29905
23914.4
23914.4
31871
31871
39814
39814
19896
19896
27845.4
27845.4
35781.2
35781.2
29800
29800
23830.4
23830.4
31759
31759
39674
39674
19826
19826
27747.4
27747.4
35655.2
35655.2
29695
29695
23746.4
23746.4

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment