Last active
August 29, 2015 14:25
-
-
Save rofr/55d0eebad4f2b5203eae to your computer and use it in GitHub Desktop.
Message broker built with OrigoDB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
namespace OrigoDB.Core.Models | |
{ | |
/// <summary> | |
/// Message broker supporting any number of queues (competing consumers) | |
/// or topics (multiple subscribers) | |
/// </summary> | |
[Serializable] | |
public class MessageBroker : Model | |
{ | |
[Serializable] | |
class MessageQueue : Queue<Message> | |
{ | |
} | |
/// <summary> | |
/// A topic is simply a set of subscribers, each with it's own message queue | |
/// </summary> | |
[Serializable] | |
class Topic : Dictionary<Guid, MessageQueue> | |
{ | |
} | |
private readonly Dictionary<String, MessageQueue> _queues | |
= new Dictionary<string, MessageQueue>(StringComparer.OrdinalIgnoreCase); | |
private readonly Dictionary<String, Topic> _topics | |
= new Dictionary<string, Topic>(StringComparer.OrdinalIgnoreCase); | |
public void Subscribe(Guid subscriber, String topicName) | |
{ | |
var topic = GetTopic(topicName); | |
if (topic.ContainsKey(subscriber)) throw new CommandAbortedException("Already subscribed"); | |
topic[subscriber] = new MessageQueue(); | |
} | |
/// <summary> | |
/// remove subscriber from topic and return any remaining messages | |
/// </summary> | |
public Message[] Unsubscribe(Guid subscriber, String topicName) | |
{ | |
var topic = GetTopic(topicName); | |
MessageQueue q; | |
if (!topic.TryGetValue(subscriber, out q)) | |
{ | |
throw new CommandAbortedException("No such subscriber"); | |
} | |
topic.Remove(subscriber); | |
return q.ToArray(); | |
} | |
public void Enqueue(string queue, Message message) | |
{ | |
var q = GetQueue(queue); | |
q.Enqueue(message); | |
} | |
/// <summary> | |
/// Get the next message in a given queue | |
/// </summary> | |
/// <param name="queue"></param> | |
/// <returns>the next message or null if the queue is empty</returns> | |
[Command(MapTo = typeof(DequeueCommand))] | |
public Message Dequeue(string queue) | |
{ | |
var q = GetQueue(queue); | |
return q.Count > 0 ? q.Dequeue() : null; | |
} | |
/// <summary> | |
/// Leave a message to every subscriber of a given topic | |
/// </summary> | |
/// <param name="topicName"></param> | |
/// <param name="message"></param> | |
public void Publish(string topicName, Message message) | |
{ | |
var topic = GetTopic(topicName); | |
foreach (var queue in topic.Values) | |
{ | |
queue.Enqueue(message); | |
} | |
} | |
/// <summary> | |
/// Grab messages for a given subscription | |
/// </summary> | |
/// <param name="subscriber">id of the subscriber</param> | |
/// <param name="topicName">Case-insensitive name of the topic</param> | |
/// <param name="maxMessages">maximum number of messages, default is 10</param> | |
/// <returns>an array of messages, possibly empty</returns> | |
[Command] | |
public Message[] Take(Guid subscriber, string topicName, int maxMessages = 10) | |
{ | |
var topic = GetTopic(topicName); | |
MessageQueue queue; | |
if (!topic.TryGetValue(subscriber, out queue)) throw new CommandAbortedException("No such subscriber"); | |
int count = Math.Min(maxMessages, queue.Count); | |
var result = new Message[count]; | |
for (int i = 0; i < count; i++) | |
{ | |
result[i] = queue.Dequeue(); | |
} | |
return result; | |
} | |
public String[] GetQueueNames() | |
{ | |
return _queues.Keys.ToArray(); | |
} | |
public String[] GetTopicNames() | |
{ | |
return _topics.Keys.ToArray(); | |
} | |
public Guid[] GetSubscribers(string topicName) | |
{ | |
var topic = GetTopic(topicName); | |
return topic.Keys.ToArray(); | |
} | |
public void CreateQueue(string name) | |
{ | |
if (_queues.ContainsKey(name)) throw new CommandAbortedException("Queue already exists"); | |
_queues[name] = new MessageQueue(); | |
} | |
public void CreateTopic(string name) | |
{ | |
if (_topics.ContainsKey(name)) throw new CommandAbortedException("Topic already exists"); | |
_topics[name] = new Topic(); | |
} | |
public void DeleteQueue(string queueName) | |
{ | |
if (!_queues.Remove(queueName)) throw new CommandAbortedException("No such queue"); | |
} | |
public void DeleteTopic(string topicName) | |
{ | |
if (!_topics.Remove(topicName)) throw new CommandAbortedException("No such topic"); | |
} | |
private Topic GetTopic(string name, bool mustExist = true) | |
{ | |
Topic topic; | |
_topics.TryGetValue(name, out topic); | |
if (topic == null && mustExist) throw new CommandAbortedException("No such topic: " + name); | |
return topic; | |
} | |
private MessageQueue GetQueue(string name, bool mustExist = true) | |
{ | |
MessageQueue queue; | |
_queues.TryGetValue(name, out queue); | |
if (queue == null && mustExist) throw new CommandAbortedException("No such queue: " + name); | |
return queue; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using NUnit.Framework; | |
using OrigoDB.Core.Models; | |
namespace OrigoDB.Core.Test | |
{ | |
[TestFixture] | |
public class MessageBrokerTests | |
{ | |
[Test] | |
public void SmokeTest() | |
{ | |
const string aQueue = "myqueue"; | |
const string aTopic = "mytopic"; | |
const string aGreeting = "Hello world!"; | |
var aMessage = new TextMessage(aGreeting); | |
var config = new EngineConfiguration().ForIsolatedTest(); | |
var broker = Db.For<MessageBroker>(config); | |
//create/write/read queue | |
broker.CreateQueue(aQueue); | |
broker.Enqueue(aQueue, aMessage); | |
var message = (TextMessage) broker.Dequeue(aQueue); | |
Assert.IsNotNull(message); | |
Assert.AreEqual(aGreeting, message.Body); | |
//if queue is empty null is returned | |
message = (TextMessage) broker.Dequeue(aQueue); | |
Assert.IsNull(message); | |
broker.CreateTopic(aTopic); | |
//no op, no subscribers | |
broker.Publish(aTopic, aMessage); | |
var aSubscriber = Guid.NewGuid(); | |
broker.Subscribe(aSubscriber, aTopic); | |
broker.Publish(aTopic, aMessage); | |
broker.Publish(aTopic, aMessage); | |
var messages = broker.Poll(aSubscriber, aTopic); | |
Assert.AreEqual(messages.Length, 2); | |
//Messages are immutable so we should get same instances back! | |
Assert.AreSame(messages[0], aMessage); | |
Guid[] subscribers = broker.GetSubscribers(aTopic); | |
Assert.AreEqual(subscribers.Length, 1); | |
Assert.AreEqual(aSubscriber, subscribers[0]); | |
broker.Unsubscribe(aSubscriber, aTopic); | |
Assert.AreEqual(broker.GetSubscribers(aTopic).Length, 0); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment