Last active
December 3, 2015 18:33
-
-
Save yreynhout/e7787d28913d486e34ac to your computer and use it in GitHub Desktop.
Poor man's messaging
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Concurrent; | |
using System.Collections.Generic; | |
using System.Threading; | |
using System.Threading.Tasks; | |
namespace ChannelZero | |
{ | |
class Program | |
{ | |
static void Main() | |
{ | |
var requestQueue = new BlockingCollection<object>(); | |
var responseQueue = new BlockingCollection<object>(); | |
var callbackRegistry = new Dictionary<Guid, Action<object>>(); | |
var requestChannel = new QueueChannel(requestQueue); | |
var responseChannel = new QueueChannel(responseQueue); | |
var requestDispatcher = new MessageDispatcher( | |
new Dictionary<Type, IMessageHandler<object>> | |
{ | |
{ | |
typeof (Request), | |
new MessageHandlerAdapter<Request>( | |
new EchoRequestHandler(responseChannel)) | |
} | |
}); | |
var requestLoop = new QueueMessageLoop(requestQueue, requestDispatcher); | |
requestLoop.Start(); | |
var responseDispatcher = new MessageDispatcher( | |
new Dictionary<Type, IMessageHandler<object>> | |
{ | |
{ | |
typeof(Response), | |
new MessageHandlerAdapter<Response>( | |
new CallbackResponseHandler(callbackRegistry)) | |
} | |
}); | |
var responseLoop = new QueueMessageLoop(responseQueue, responseDispatcher); | |
responseLoop.Start(); | |
var requestResponseChannel = new RequestResponseChannel(requestChannel, callbackRegistry); | |
requestResponseChannel.Send("Hello world!", Console.WriteLine); | |
Console.ReadLine(); | |
} | |
} | |
public interface IChannel | |
{ | |
void Send(object message); | |
} | |
public class QueueChannel : IChannel | |
{ | |
private readonly BlockingCollection<object> _queue; | |
public QueueChannel(BlockingCollection<object> queue) | |
{ | |
if (queue == null) throw new ArgumentNullException("queue"); | |
_queue = queue; | |
} | |
public void Send(object message) | |
{ | |
_queue.Add(message); | |
} | |
} | |
public class QueueMessageLoop | |
{ | |
private readonly BlockingCollection<object> _queue; | |
private readonly IMessageDispatcher _dispatcher; | |
private CancellationTokenSource _stopSource; | |
private Task _task; | |
public QueueMessageLoop(BlockingCollection<object> queue, IMessageDispatcher dispatcher) | |
{ | |
if (queue == null) throw new ArgumentNullException("queue"); | |
if (dispatcher == null) throw new ArgumentNullException("dispatcher"); | |
_queue = queue; | |
_dispatcher = dispatcher; | |
} | |
public void Start() | |
{ | |
_stopSource = new CancellationTokenSource(); | |
_task = Task.Factory.StartNew(Run, _stopSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default); | |
} | |
private void Run() | |
{ | |
while (!_stopSource.IsCancellationRequested) | |
{ | |
foreach (var message in _queue.GetConsumingEnumerable(_stopSource.Token)) | |
{ | |
_dispatcher.Dispatch(message); | |
} | |
} | |
} | |
public void Stop() | |
{ | |
_stopSource.Cancel(); | |
_task.Wait(); | |
} | |
} | |
public interface IMessageDispatcher | |
{ | |
void Dispatch(object message); | |
} | |
public class MessageDispatcher : IMessageDispatcher | |
{ | |
private readonly IDictionary<Type, IMessageHandler<object>> _handlers; | |
public MessageDispatcher(IDictionary<Type, IMessageHandler<object>> handlers) | |
{ | |
if (handlers == null) throw new ArgumentNullException("handlers"); | |
_handlers = handlers; | |
} | |
public void Dispatch(object message) | |
{ | |
if (message == null) throw new ArgumentNullException("message"); | |
IMessageHandler<object> handler; | |
if (_handlers.TryGetValue(message.GetType(), out handler)) | |
{ | |
handler.Handle(message); | |
} | |
} | |
} | |
public interface IMessageHandler<in TMessage> | |
{ | |
void Handle(TMessage message); | |
} | |
public class MessageHandlerAdapter<TMessage> : IMessageHandler<object> | |
{ | |
private readonly IMessageHandler<TMessage> _handler; | |
public MessageHandlerAdapter(IMessageHandler<TMessage> handler) | |
{ | |
if (handler == null) throw new ArgumentNullException("handler"); | |
_handler = handler; | |
} | |
public void Handle(object message) | |
{ | |
_handler.Handle((TMessage)message); | |
} | |
} | |
public class Request | |
{ | |
public Guid CorrelationId { get; set; } | |
public object Message { get; set; } | |
} | |
public class Response | |
{ | |
public Guid CorrelationId { get; set; } | |
public object Message { get; set; } | |
} | |
public class EchoRequestHandler : IMessageHandler<Request> | |
{ | |
private readonly IChannel _responseChannel; | |
public EchoRequestHandler(IChannel responseChannel) | |
{ | |
if (responseChannel == null) throw new ArgumentNullException("responseChannel"); | |
_responseChannel = responseChannel; | |
} | |
public void Handle(Request message) | |
{ | |
_responseChannel.Send(new Response { CorrelationId = message.CorrelationId, Message = message.Message }); | |
} | |
} | |
public class CallbackResponseHandler : IMessageHandler<Response> | |
{ | |
private readonly IDictionary<Guid, Action<object>> _callbackRegistry; | |
public CallbackResponseHandler(IDictionary<Guid, Action<object>> callbackRegistry) | |
{ | |
if (callbackRegistry == null) throw new ArgumentNullException("callbackRegistry"); | |
_callbackRegistry = callbackRegistry; | |
} | |
public void Handle(Response message) | |
{ | |
if (message == null) throw new ArgumentNullException("message"); | |
Action<object> callback; | |
if (_callbackRegistry.TryGetValue(message.CorrelationId, out callback)) | |
{ | |
callback(message.Message); | |
_callbackRegistry.Remove(message.CorrelationId); | |
} | |
} | |
} | |
public interface IRequestResponseChannel | |
{ | |
void Send(object message, Action<object> callback); | |
} | |
public class RequestResponseChannel : IRequestResponseChannel | |
{ | |
private readonly IChannel _requestChannel; | |
private readonly IDictionary<Guid, Action<object>> _callbackRegistry; | |
public RequestResponseChannel(IChannel requestChannel, IDictionary<Guid, Action<object>> callbackRegistry) | |
{ | |
if (requestChannel == null) throw new ArgumentNullException("requestChannel"); | |
if (callbackRegistry == null) throw new ArgumentNullException("callbackRegistry"); | |
_requestChannel = requestChannel; | |
_callbackRegistry = callbackRegistry; | |
} | |
public void Send(object message, Action<object> callback) | |
{ | |
if (message == null) throw new ArgumentNullException("message"); | |
if (callback == null) throw new ArgumentNullException("callback"); | |
var correlationId = Guid.NewGuid(); | |
_callbackRegistry.Add(correlationId, callback); | |
_requestChannel.Send(new Request { CorrelationId = correlationId, Message = message }); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment