Skip to content

Instantly share code, notes, and snippets.

Created February 20, 2012 15:19
Show Gist options
  • Save haf/1869638 to your computer and use it in GitHub Desktop.
Save haf/1869638 to your computer and use it in GitHub Desktop.
Asynchronous Retries MassTransit
using System;
using System.Collections.Generic;
using System.Numerics;
using System.Threading;
using System.Threading.Tasks;
using System.Linq;
using Magnum.Extensions;
using Magnum.Policies;
using MassTransit.Context;
using NUnit.Framework;
namespace MassTransit.Transports.Tests
interface Cmd
class NewId
internal static int Generate()
return 42;
class MTConfig
internal static Uri Endpoint
get { return new Uri("rabbitmq://ha-cluster-4.corp.local/ep42"); }
class Management
readonly Redis _r;
readonly Uri _endpoint;
public Management(Redis r, Uri endpoint)
_r = r;
_endpoint = endpoint;
public Management(Uri endpoint)
_endpoint = endpoint;
internal void Bind()
// IModel.BindExchange(_endpoint.Name, _endpoint.Name, etc);
public void SendUnacked(Bus b)
foreach (var msg in _r.SyncPeekSet(_endpoint))
b.PublishContext((ISendContext<Cmd>) msg);
class Redis
Dictionary<Uri, HashSet<object>> lookup = new Dictionary<Uri, HashSet<object>>();
HashSet<object> Get(Uri sourceEndpointUri)
// + locking etc
HashSet<object> sett;
if (!lookup.ContainsKey(sourceEndpointUri))
sett = lookup[sourceEndpointUri] = new HashSet<object>();
sett = lookup[sourceEndpointUri];
return sett;
internal void SyncAppendSet<T>(
Uri sourceEndpointUri,
ISendContext<T> message) where T : class
internal void SyncRemoveSet<T>(
Uri sourceEndpointUri,
ISendContext<T> message) where T : class
public IEnumerable<ISendContext> SyncPeekSet(Uri sourceEndpointUri)
return Get(sourceEndpointUri).Cast<ISendContext>();
/// <summary>simple unit () type of the ACK</summary>
class ACK
/// <summary>Option type like discriminated unions are</summary>
class Cell<TEither, TOr>
volatile bool set;
public Cell()
public Cell(TEither either)
// lock etc
if (set) return;
public Cell(TOr or)
if (set) return;
public void Set(TEither either)
public void Set(TOr or)
static class AckBuilder
// TPoll has the same semantics as Poll would with ZeroMQ - a poll is either
// for the implicit (non-reply) callback of the send of TMsg
// because of the lack of a failure from the transport
// OR the actual message we got back
internal static Action<Cell<ReceiveContext, ISendContext<TMsg>>> DefaultFor<TMsg, TPoll>(
// curry away the bus, and create a new AckBuilder with it curried away
Bus b,
// user-passed callback
Action<Cell<TPoll, ISendContext<TMsg>>> callback = null,
// a selector that e.g. checks the message id; similar to the CorrelateBy(T -> bool) in sagas
Func<TPoll, bool> selector = null)
where TMsg : class
selector = selector ?? (tp => true /* always ACK on the type per default */);
var cell = new Cell<TPoll, ISendContext<TMsg>>();
// must make sure either of these lambdas are called; not both, but that threading concern is outside my aim of
// this discussion-oriented code snippet
MassTransit.UnsubscribeAction unsub = null;
unsub = b.SubscribeHandler(selector, tpoll =>
return c =>
// cell.Set(c.Value)
/// <summary>This message is the event that the domain service publishes if the command passed</summary>
interface CmdEventActingAsACK
class GuiBus // simply showing UI updates
internal void Publish<T>()
// etc...
interface DisplayErrorDialog
class Usage
GuiBus guiEventing = new GuiBus();
Bus b;
void simple_publish_async()
Value = 3.4,
Other = "Rat"
// pass a policy which can be curried away
// but specifies 1-2 times to just re-send in case msg got lost
PolicyBuilder.For<Exception /* delivery failed, MT specific ex */>(
ex => /*ex.Retries == 5*/ true)
.CircuitBreak(5.Seconds(), 2 /* times */),
// send the interaction pieces into the bus; this is the 'application logic'
// on ack, nack and failure logic
AckBuilder.DefaultFor<Cmd, CmdEventActingAsACK>(b),
// here we say that we're sending a command, so we're going to keep retrying until
// we succeed
(error, sendContext) =>
return TransportUnreachableProgrammerDecision.ContinueTryingTillAvailable;
/// <summary>
/// in the case of RMQ we don't get ACKs for successes, but we are sure to get
/// an asynchronous exception if something goes wrong, within 10 seconds
/// </summary>
class Scheduler
Timer t;
public Scheduler()
t = new Timer(Tick);
// crude scheduler
// actually, I want a min-heap here, sorted by date time ascending!
// so lets
SortedList<DateTime, Action> callbacks = new SortedList<DateTime, Action>();
void Tick(object state)
// perform all callbacks
void Check()
// loop that calls back the Actions
KeyValuePair<DateTime, Action> keyValuePair;
while ((keyValuePair = callbacks.First()).Key < DateTime.UtcNow)
internal void Schedule(Action callback, TimeSpan dueTime)
callbacks.Add(DateTime.UtcNow + dueTime, callback);
enum TransportUnreachableProgrammerDecision
/// <summary>
/// Meaning: I can't actually do anything; this is a command that I need to send
/// and that command is not temporally tied in any way that I care about, so just
/// continue at it, and send that darn command when you get back up. This is the
/// 'occasionally connected' answer to the decision on whether to publish on publish
/// failed. This means we're storing the send context locally and then publishing it.
/// </summary>
class Bus
Redis redis = new Redis();
OutTransport t = new OutTransport(null);
Scheduler s = new Scheduler();
internal void Publish<T>(object vals,
// this exception policy is only a decider on when to notify the error callback (4th param)
// but most often it shouldn't do very much, because connectivity errors and the like
// should be handled by the transports and their connection handlers
ExceptionPolicy exceptionPolicy,
Action<Cell<ReceiveContext, ISendContext<T>>> ackCallback,
Func<Exception, ISendContext<T>, TransportUnreachableProgrammerDecision> errorCallback)
where T : class
var sendContext = new SendContext<T>((T) vals);
// need a message id, this is the thing we track in redis
// save book keeping data about the send context in case the publish is never ACKed
redis.SyncAppendSet(MTConfig.Endpoint, sendContext);
// perform the actual publish which writes to network buffers, in-proc buffers and so on
t.Publish(sendContext, errorCallback);
// make sure that we remove the send context from the list of outstanding
// contexts from redis when we can be sure we've sent it properly to the broker
// We closure-capture, because the incoming lambda wants just the 'semantic' ACK
// which is not necessarily the actual sending to be ACKed
// By doing it this way we may support ZMQ, event coming in over a SUB socket in the
// case of an event, or a ACK coming in over a PULL socket
ackCallback += _ => redis.SyncRemoveSet(MTConfig.Endpoint, sendContext);
// since we don't know about what RMQ does in the background, but
// we know it notifies us on failure in at least 10 seconds, schedule the removal
// of the "outstanding message" in ten seconds
s.Schedule(() => ackCallback(new Cell<ReceiveContext, ISendContext<T>>(sendContext)), 10.Seconds());
// called when we are starting up, no subscriptions at all, but here we may let the programmer
// register a callback on application startup that allows the programmer to handle the outstanding
// messages that were not sent; e.g. by enqueueing them together with a batch of fresh messages
// that was received from the domain service; and in this case we may need to do event merging.
public void PublishContext<T>(ISendContext<T> sendContext)
where T : class
// normal send here
t.Publish(sendContext, (e, ctx) => TransportUnreachableProgrammerDecision.ContinueTryingTillAvailable);
Management m;
void OnStartUp()
// when we start we do the normal stuff
m = new Management(MTConfig.Endpoint);
// such as binding our queue to an exchange
// then we make sure we have an empty slate, by sending all
// messages that were never sent
public MassTransit.UnsubscribeAction SubscribeHandler<T>(Func<T, bool> selector, Action<T> action)
// etc...
return () => true;
interface IModel
event EventHandler TransportError;
// really, it doesn't look like this, but it's similar
void AsyncPublish<T>(ISendContext<T> context) where T : class;
class OutTransport
readonly IModel _connectionHandler;
readonly List<Func<Exception, TransportUnreachableProgrammerDecision>> errorCallbacks
= new List<Func<Exception, TransportUnreachableProgrammerDecision>>();
public OutTransport(IModel connectionHandler)
_connectionHandler = connectionHandler;
_connectionHandler.TransportError += (object sender, EventArgs e) =>
lock (errorCallbacks)
errorCallbacks.ForEach(cb =>
// var error = e.Error;
string error = "";
var decision = cb(new Exception(error));
// enqueue the corresponding action on an actor
// that requeues the send potentially
// or just gives up
// somehow correlate all of the
public void Publish<T>(ISendContext<T> sendContext,
Func<Exception, ISendContext<T>, TransportUnreachableProgrammerDecision> errorCallback)
where T : class
errorCallbacks.Add((err) => errorCallback(err, sendContext));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment