Skip to content

Instantly share code, notes, and snippets.

@RhysC
Created October 21, 2014 12:02
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save RhysC/e5e29ea256edf49d2085 to your computer and use it in GitHub Desktop.
Save RhysC/e5e29ea256edf49d2085 to your computer and use it in GitHub Desktop.
LinqPad sample of Redis Pub sub using StackExchange.Redis and RX
<Query Kind="Program">
<NuGetReference>Newtonsoft.Json</NuGetReference>
<NuGetReference>Rx-Main</NuGetReference>
<NuGetReference>StackExchange.Redis</NuGetReference>
<Namespace>System.Reactive.Linq</Namespace>
<Namespace>System.Reactive.Disposables</Namespace>
<Namespace>StackExchange.Redis</Namespace>
</Query>
void Main()
{
//You will need a redis server running locally.
//Use chocolatey to install the windows port \> choco install redis-64
//run up the server \> redis-server
var redis = ConnectionMultiplexer.Connect("localhost");
var observable = redis.GetSubscriptionChannel<SampleDto>();
var subscription = observable.Subscribe(x=>x.Dump());
Observable.Interval(TimeSpan.FromSeconds(2)).Subscribe(_ => redis.PublishToChannel(SampleDto.Create()));
Console.ReadLine();
subscription.Dispose();
"Subscription disposed".Dump();
Console.ReadLine();
}
public static class RedisRx
{
public static IObservable<T> GetSubscriptionChannel<T>(this ConnectionMultiplexer redis)
{
return Observable.Create<T>(obs=>{
var redisSubscriber =redis.GetSubscriber();
var channelName = typeof(T).Name;
Action<RedisChannel,RedisValue> handler = (c,v)=> {
try{
var deserialisedPayload = Newtonsoft.Json.JsonConvert.DeserializeObject<T>(v);
obs.OnNext(deserialisedPayload);
}
catch(Exception e)
{
obs.OnError(e);
}
};
redisSubscriber.Subscribe(channelName, handler);
return Disposable.Create(()=> { redisSubscriber.Unsubscribe(channelName, handler);});
});
}
public static void PublishToChannel<T>(this ConnectionMultiplexer redis, T payload)
{
var redisSubscriber = redis.GetSubscriber();
var channelName = typeof(T).Name;
var json = Newtonsoft.Json.JsonConvert.SerializeObject(payload);
redisSubscriber.Publish(channelName, json);
}
}
// Define other methods and classes here
public class SampleDto
{
public static SampleDto Create()
{
return new SampleDto{Id = Guid.NewGuid(), Timestamp = DateTime.Now };
}
public Guid Id { get; set; }
public DateTimeOffset Timestamp { get; set; }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment