Skip to content

Instantly share code, notes, and snippets.

@liammclennan
Last active April 17, 2020 18:29
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save liammclennan/5adf41090507ad6b0b0c to your computer and use it in GitHub Desktop.
Save liammclennan/5adf41090507ad6b0b0c to your computer and use it in GitHub Desktop.
RedisMQ

RedisMQ

Many people use Redis for a simple message broker. RedisMQ is a trivial layer on Redis and StackExchange.Redis that probably does not work. It provides two simple messaging patterns:

Publish / subscribe

A publisher publishes messages. 0, 1 or more consumers subscribe to the messages.

var redis = ConnectionMultiplexer.Connect("192.168.85.128");
redis.BroadcastSubscribe<string>("this-is-the-channel", message => 
    // do something with the message);
redis.BroadcastPublish("this-is-the-channel", "42");

Competing consumer

A publisher publishes messages to a queue. Messages stay in the queue until someone reads them. Many subscribers may read from the same queue. Each message is processed by just one consumer.

var redis = ConnectionMultiplexer.Connect("192.168.85.128");
redis.CompetingConsumerSubscribe<int>("numbers", message => 
    // do something with the message);
redis.CompetingConsumerPublish("numbers", 1);

Speed

RedisMQ is not optimised for performance. It is optimized for being simple for me to write. However, the following gives a rough indication of performance.

Strategy # messages Time (s)
Broadcast to 2 consumers 100,000 21.3
Competing consumer with 1 subscriber 100,000 37.7
Competing consumer with 2 subscribers 100,000 23.7

Dependencies

PM> Install-Package StackExchange.Redis
PM> Install-Package Newtonsoft.Json 
namespace {
public static class RedisMQ {
private static string StreamToDataChannel(string stream) {
return stream+"-data";
}
private static string StreamToTriggerChannel(string stream) {
return stream+"-trigger";
}
private static string StreamToBroadcastChannel(string stream) {
return stream+"-broadcast";
}
public static void CompetingConsumerPublish<T>(this ConnectionMultiplexer redis, string stream, T data) {
redis.GetDatabase().ListLeftPush(StreamToDataChannel(stream), JsonConvert.SerializeObject(data), flags: CommandFlags.FireAndForget);
redis.GetSubscriber().Publish(StreamToTriggerChannel(stream), "trigger");
}
public static void CompetingConsumerSubscribe<T>(this ConnectionMultiplexer redis, string stream, Action<T> handler) {
redis.GetSubscriber().Subscribe(StreamToTriggerChannel(stream), (channel,_) => {
var message = (string)redis.GetDatabase().ListRightPop(StreamToDataChannel(stream));
if (message != null) handler(JsonConvert.DeserializeObject<T>(message));
});
}
public static void BroadcastPublish<T>(this ConnectionMultiplexer redis, string stream, T data) {
redis.GetSubscriber()
.Publish(StreamToBroadcastChannel(stream), JsonConvert.SerializeObject(data));
}
public static void BroadcastSubscribe<T>(this ConnectionMultiplexer redis, string stream, Action<T> handler) {
redis.GetSubscriber()
.Subscribe(StreamToBroadcastChannel(stream), (channel,message)=>
handler(JsonConvert.DeserializeObject<T>((string)message)));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment