Skip to content

Instantly share code, notes, and snippets.

@rofr
Last active August 29, 2015 14:25
Show Gist options
  • Save rofr/55d0eebad4f2b5203eae to your computer and use it in GitHub Desktop.
Save rofr/55d0eebad4f2b5203eae to your computer and use it in GitHub Desktop.
Message broker built with OrigoDB
using System;
using System.Collections.Generic;
using System.Linq;
namespace OrigoDB.Core.Models
{
/// <summary>
/// Message broker supporting any number of queues (competing consumers)
/// or topics (multiple subscribers)
/// </summary>
[Serializable]
public class MessageBroker : Model
{
[Serializable]
class MessageQueue : Queue<Message>
{
}
/// <summary>
/// A topic is simply a set of subscribers, each with it's own message queue
/// </summary>
[Serializable]
class Topic : Dictionary<Guid, MessageQueue>
{
}
private readonly Dictionary<String, MessageQueue> _queues
= new Dictionary<string, MessageQueue>(StringComparer.OrdinalIgnoreCase);
private readonly Dictionary<String, Topic> _topics
= new Dictionary<string, Topic>(StringComparer.OrdinalIgnoreCase);
public void Subscribe(Guid subscriber, String topicName)
{
var topic = GetTopic(topicName);
if (topic.ContainsKey(subscriber)) throw new CommandAbortedException("Already subscribed");
topic[subscriber] = new MessageQueue();
}
/// <summary>
/// remove subscriber from topic and return any remaining messages
/// </summary>
public Message[] Unsubscribe(Guid subscriber, String topicName)
{
var topic = GetTopic(topicName);
MessageQueue q;
if (!topic.TryGetValue(subscriber, out q))
{
throw new CommandAbortedException("No such subscriber");
}
topic.Remove(subscriber);
return q.ToArray();
}
public void Enqueue(string queue, Message message)
{
var q = GetQueue(queue);
q.Enqueue(message);
}
/// <summary>
/// Get the next message in a given queue
/// </summary>
/// <param name="queue"></param>
/// <returns>the next message or null if the queue is empty</returns>
[Command(MapTo = typeof(DequeueCommand))]
public Message Dequeue(string queue)
{
var q = GetQueue(queue);
return q.Count > 0 ? q.Dequeue() : null;
}
/// <summary>
/// Leave a message to every subscriber of a given topic
/// </summary>
/// <param name="topicName"></param>
/// <param name="message"></param>
public void Publish(string topicName, Message message)
{
var topic = GetTopic(topicName);
foreach (var queue in topic.Values)
{
queue.Enqueue(message);
}
}
/// <summary>
/// Grab messages for a given subscription
/// </summary>
/// <param name="subscriber">id of the subscriber</param>
/// <param name="topicName">Case-insensitive name of the topic</param>
/// <param name="maxMessages">maximum number of messages, default is 10</param>
/// <returns>an array of messages, possibly empty</returns>
[Command]
public Message[] Take(Guid subscriber, string topicName, int maxMessages = 10)
{
var topic = GetTopic(topicName);
MessageQueue queue;
if (!topic.TryGetValue(subscriber, out queue)) throw new CommandAbortedException("No such subscriber");
int count = Math.Min(maxMessages, queue.Count);
var result = new Message[count];
for (int i = 0; i < count; i++)
{
result[i] = queue.Dequeue();
}
return result;
}
public String[] GetQueueNames()
{
return _queues.Keys.ToArray();
}
public String[] GetTopicNames()
{
return _topics.Keys.ToArray();
}
public Guid[] GetSubscribers(string topicName)
{
var topic = GetTopic(topicName);
return topic.Keys.ToArray();
}
public void CreateQueue(string name)
{
if (_queues.ContainsKey(name)) throw new CommandAbortedException("Queue already exists");
_queues[name] = new MessageQueue();
}
public void CreateTopic(string name)
{
if (_topics.ContainsKey(name)) throw new CommandAbortedException("Topic already exists");
_topics[name] = new Topic();
}
public void DeleteQueue(string queueName)
{
if (!_queues.Remove(queueName)) throw new CommandAbortedException("No such queue");
}
public void DeleteTopic(string topicName)
{
if (!_topics.Remove(topicName)) throw new CommandAbortedException("No such topic");
}
private Topic GetTopic(string name, bool mustExist = true)
{
Topic topic;
_topics.TryGetValue(name, out topic);
if (topic == null && mustExist) throw new CommandAbortedException("No such topic: " + name);
return topic;
}
private MessageQueue GetQueue(string name, bool mustExist = true)
{
MessageQueue queue;
_queues.TryGetValue(name, out queue);
if (queue == null && mustExist) throw new CommandAbortedException("No such queue: " + name);
return queue;
}
}
}
using System;
using NUnit.Framework;
using OrigoDB.Core.Models;
namespace OrigoDB.Core.Test
{
[TestFixture]
public class MessageBrokerTests
{
[Test]
public void SmokeTest()
{
const string aQueue = "myqueue";
const string aTopic = "mytopic";
const string aGreeting = "Hello world!";
var aMessage = new TextMessage(aGreeting);
var config = new EngineConfiguration().ForIsolatedTest();
var broker = Db.For<MessageBroker>(config);
//create/write/read queue
broker.CreateQueue(aQueue);
broker.Enqueue(aQueue, aMessage);
var message = (TextMessage) broker.Dequeue(aQueue);
Assert.IsNotNull(message);
Assert.AreEqual(aGreeting, message.Body);
//if queue is empty null is returned
message = (TextMessage) broker.Dequeue(aQueue);
Assert.IsNull(message);
broker.CreateTopic(aTopic);
//no op, no subscribers
broker.Publish(aTopic, aMessage);
var aSubscriber = Guid.NewGuid();
broker.Subscribe(aSubscriber, aTopic);
broker.Publish(aTopic, aMessage);
broker.Publish(aTopic, aMessage);
var messages = broker.Poll(aSubscriber, aTopic);
Assert.AreEqual(messages.Length, 2);
//Messages are immutable so we should get same instances back!
Assert.AreSame(messages[0], aMessage);
Guid[] subscribers = broker.GetSubscribers(aTopic);
Assert.AreEqual(subscribers.Length, 1);
Assert.AreEqual(aSubscriber, subscribers[0]);
broker.Unsubscribe(aSubscriber, aTopic);
Assert.AreEqual(broker.GetSubscribers(aTopic).Length, 0);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment