Skip to content

Instantly share code, notes, and snippets.

@yreynhout
Last active December 3, 2015 18:33
Show Gist options
  • Save yreynhout/e7787d28913d486e34ac to your computer and use it in GitHub Desktop.
Save yreynhout/e7787d28913d486e34ac to your computer and use it in GitHub Desktop.
Poor man's messaging
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace ChannelZero
{
class Program
{
static void Main()
{
var requestQueue = new BlockingCollection<object>();
var responseQueue = new BlockingCollection<object>();
var callbackRegistry = new Dictionary<Guid, Action<object>>();
var requestChannel = new QueueChannel(requestQueue);
var responseChannel = new QueueChannel(responseQueue);
var requestDispatcher = new MessageDispatcher(
new Dictionary<Type, IMessageHandler<object>>
{
{
typeof (Request),
new MessageHandlerAdapter<Request>(
new EchoRequestHandler(responseChannel))
}
});
var requestLoop = new QueueMessageLoop(requestQueue, requestDispatcher);
requestLoop.Start();
var responseDispatcher = new MessageDispatcher(
new Dictionary<Type, IMessageHandler<object>>
{
{
typeof(Response),
new MessageHandlerAdapter<Response>(
new CallbackResponseHandler(callbackRegistry))
}
});
var responseLoop = new QueueMessageLoop(responseQueue, responseDispatcher);
responseLoop.Start();
var requestResponseChannel = new RequestResponseChannel(requestChannel, callbackRegistry);
requestResponseChannel.Send("Hello world!", Console.WriteLine);
Console.ReadLine();
}
}
public interface IChannel
{
void Send(object message);
}
public class QueueChannel : IChannel
{
private readonly BlockingCollection<object> _queue;
public QueueChannel(BlockingCollection<object> queue)
{
if (queue == null) throw new ArgumentNullException("queue");
_queue = queue;
}
public void Send(object message)
{
_queue.Add(message);
}
}
public class QueueMessageLoop
{
private readonly BlockingCollection<object> _queue;
private readonly IMessageDispatcher _dispatcher;
private CancellationTokenSource _stopSource;
private Task _task;
public QueueMessageLoop(BlockingCollection<object> queue, IMessageDispatcher dispatcher)
{
if (queue == null) throw new ArgumentNullException("queue");
if (dispatcher == null) throw new ArgumentNullException("dispatcher");
_queue = queue;
_dispatcher = dispatcher;
}
public void Start()
{
_stopSource = new CancellationTokenSource();
_task = Task.Factory.StartNew(Run, _stopSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
private void Run()
{
while (!_stopSource.IsCancellationRequested)
{
foreach (var message in _queue.GetConsumingEnumerable(_stopSource.Token))
{
_dispatcher.Dispatch(message);
}
}
}
public void Stop()
{
_stopSource.Cancel();
_task.Wait();
}
}
public interface IMessageDispatcher
{
void Dispatch(object message);
}
public class MessageDispatcher : IMessageDispatcher
{
private readonly IDictionary<Type, IMessageHandler<object>> _handlers;
public MessageDispatcher(IDictionary<Type, IMessageHandler<object>> handlers)
{
if (handlers == null) throw new ArgumentNullException("handlers");
_handlers = handlers;
}
public void Dispatch(object message)
{
if (message == null) throw new ArgumentNullException("message");
IMessageHandler<object> handler;
if (_handlers.TryGetValue(message.GetType(), out handler))
{
handler.Handle(message);
}
}
}
public interface IMessageHandler<in TMessage>
{
void Handle(TMessage message);
}
public class MessageHandlerAdapter<TMessage> : IMessageHandler<object>
{
private readonly IMessageHandler<TMessage> _handler;
public MessageHandlerAdapter(IMessageHandler<TMessage> handler)
{
if (handler == null) throw new ArgumentNullException("handler");
_handler = handler;
}
public void Handle(object message)
{
_handler.Handle((TMessage)message);
}
}
public class Request
{
public Guid CorrelationId { get; set; }
public object Message { get; set; }
}
public class Response
{
public Guid CorrelationId { get; set; }
public object Message { get; set; }
}
public class EchoRequestHandler : IMessageHandler<Request>
{
private readonly IChannel _responseChannel;
public EchoRequestHandler(IChannel responseChannel)
{
if (responseChannel == null) throw new ArgumentNullException("responseChannel");
_responseChannel = responseChannel;
}
public void Handle(Request message)
{
_responseChannel.Send(new Response { CorrelationId = message.CorrelationId, Message = message.Message });
}
}
public class CallbackResponseHandler : IMessageHandler<Response>
{
private readonly IDictionary<Guid, Action<object>> _callbackRegistry;
public CallbackResponseHandler(IDictionary<Guid, Action<object>> callbackRegistry)
{
if (callbackRegistry == null) throw new ArgumentNullException("callbackRegistry");
_callbackRegistry = callbackRegistry;
}
public void Handle(Response message)
{
if (message == null) throw new ArgumentNullException("message");
Action<object> callback;
if (_callbackRegistry.TryGetValue(message.CorrelationId, out callback))
{
callback(message.Message);
_callbackRegistry.Remove(message.CorrelationId);
}
}
}
public interface IRequestResponseChannel
{
void Send(object message, Action<object> callback);
}
public class RequestResponseChannel : IRequestResponseChannel
{
private readonly IChannel _requestChannel;
private readonly IDictionary<Guid, Action<object>> _callbackRegistry;
public RequestResponseChannel(IChannel requestChannel, IDictionary<Guid, Action<object>> callbackRegistry)
{
if (requestChannel == null) throw new ArgumentNullException("requestChannel");
if (callbackRegistry == null) throw new ArgumentNullException("callbackRegistry");
_requestChannel = requestChannel;
_callbackRegistry = callbackRegistry;
}
public void Send(object message, Action<object> callback)
{
if (message == null) throw new ArgumentNullException("message");
if (callback == null) throw new ArgumentNullException("callback");
var correlationId = Guid.NewGuid();
_callbackRegistry.Add(correlationId, callback);
_requestChannel.Send(new Request { CorrelationId = correlationId, Message = message });
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment