Skip to content

Instantly share code, notes, and snippets.

@haf
Created February 20, 2012 15:19
Show Gist options
  • Save haf/1869638 to your computer and use it in GitHub Desktop.
Save haf/1869638 to your computer and use it in GitHub Desktop.
Asynchronous Retries MassTransit
using System;
using System.Collections.Generic;
using System.Numerics;
using System.Threading;
using System.Threading.Tasks;
using System.Linq;
using Magnum.Extensions;
using Magnum.Policies;
using MassTransit.Context;
using NUnit.Framework;
namespace MassTransit.Transports.Tests
{
interface Cmd
{
}
class NewId
{
internal static int Generate()
{
return 42;
}
}
class MTConfig
{
internal static Uri Endpoint
{
get { return new Uri("rabbitmq://ha-cluster-4.corp.local/ep42"); }
}
}
class Management
{
readonly Redis _r;
readonly Uri _endpoint;
public Management(Redis r, Uri endpoint)
{
_r = r;
_endpoint = endpoint;
}
public Management(Uri endpoint)
{
_endpoint = endpoint;
}
internal void Bind()
{
// IModel.BindExchange(_endpoint.Name, _endpoint.Name, etc);
}
public void SendUnacked(Bus b)
{
foreach (var msg in _r.SyncPeekSet(_endpoint))
{
b.PublishContext((ISendContext<Cmd>) msg);
}
}
}
class Redis
{
Dictionary<Uri, HashSet<object>> lookup = new Dictionary<Uri, HashSet<object>>();
HashSet<object> Get(Uri sourceEndpointUri)
{
// + locking etc
HashSet<object> sett;
if (!lookup.ContainsKey(sourceEndpointUri))
sett = lookup[sourceEndpointUri] = new HashSet<object>();
else
sett = lookup[sourceEndpointUri];
return sett;
}
internal void SyncAppendSet<T>(
Uri sourceEndpointUri,
ISendContext<T> message) where T : class
{
Get(sourceEndpointUri).Add(message);
}
internal void SyncRemoveSet<T>(
Uri sourceEndpointUri,
ISendContext<T> message) where T : class
{
Get(sourceEndpointUri).Remove(message);
}
public IEnumerable<ISendContext> SyncPeekSet(Uri sourceEndpointUri)
{
return Get(sourceEndpointUri).Cast<ISendContext>();
}
}
/// <summary>simple unit () type of the ACK</summary>
class ACK
{
}
/// <summary>Option type like discriminated unions are</summary>
class Cell<TEither, TOr>
{
volatile bool set;
public Cell()
{
}
public Cell(TEither either)
{
// lock etc
if (set) return;
}
public Cell(TOr or)
{
if (set) return;
}
public void Set(TEither either)
{
}
public void Set(TOr or)
{
}
}
static class AckBuilder
{
// TPoll has the same semantics as Poll would with ZeroMQ - a poll is either
// for the implicit (non-reply) callback of the send of TMsg
// because of the lack of a failure from the transport
// OR the actual message we got back
internal static Action<Cell<ReceiveContext, ISendContext<TMsg>>> DefaultFor<TMsg, TPoll>(
// curry away the bus, and create a new AckBuilder with it curried away
Bus b,
// user-passed callback
Action<Cell<TPoll, ISendContext<TMsg>>> callback = null,
// a selector that e.g. checks the message id; similar to the CorrelateBy(T -> bool) in sagas
Func<TPoll, bool> selector = null)
where TMsg : class
{
selector = selector ?? (tp => true /* always ACK on the type per default */);
var cell = new Cell<TPoll, ISendContext<TMsg>>();
// must make sure either of these lambdas are called; not both, but that threading concern is outside my aim of
// this discussion-oriented code snippet
MassTransit.UnsubscribeAction unsub = null;
unsub = b.SubscribeHandler(selector, tpoll =>
{
cell.Set(tpoll);
callback(cell);
unsub();
});
return c =>
{
// cell.Set(c.Value)
callback(cell);
unsub();
};
}
}
/// <summary>This message is the event that the domain service publishes if the command passed</summary>
interface CmdEventActingAsACK
{
}
class GuiBus // simply showing UI updates
{
internal void Publish<T>()
{
// etc...
}
}
interface DisplayErrorDialog
{
}
class Usage
{
GuiBus guiEventing = new GuiBus();
Bus b;
void simple_publish_async()
{
b.Publish<Cmd>(new
{
Value = 3.4,
Other = "Rat"
},
// pass a policy which can be curried away
// but specifies 1-2 times to just re-send in case msg got lost
PolicyBuilder.For<Exception /* delivery failed, MT specific ex */>(
ex => /*ex.Retries == 5*/ true)
.CircuitBreak(5.Seconds(), 2 /* times */),
// send the interaction pieces into the bus; this is the 'application logic'
// on ack, nack and failure logic
AckBuilder.DefaultFor<Cmd, CmdEventActingAsACK>(b),
// here we say that we're sending a command, so we're going to keep retrying until
// we succeed
(error, sendContext) =>
{
guiEventing.Publish<DisplayErrorDialog>();
return TransportUnreachableProgrammerDecision.ContinueTryingTillAvailable;
});
}
}
/// <summary>
/// in the case of RMQ we don't get ACKs for successes, but we are sure to get
/// an asynchronous exception if something goes wrong, within 10 seconds
/// </summary>
class Scheduler
{
Timer t;
public Scheduler()
{
t = new Timer(Tick);
}
// crude scheduler
// actually, I want a min-heap here, sorted by date time ascending!
// so lets
SortedList<DateTime, Action> callbacks = new SortedList<DateTime, Action>();
void Tick(object state)
{
Check();
}
// perform all callbacks
void Check()
{
// loop that calls back the Actions
KeyValuePair<DateTime, Action> keyValuePair;
while ((keyValuePair = callbacks.First()).Key < DateTime.UtcNow)
{
callbacks.RemoveAt(0);
keyValuePair.Value();
}
}
internal void Schedule(Action callback, TimeSpan dueTime)
{
callbacks.Add(DateTime.UtcNow + dueTime, callback);
}
}
enum TransportUnreachableProgrammerDecision
{
/// <summary>
/// Meaning: I can't actually do anything; this is a command that I need to send
/// and that command is not temporally tied in any way that I care about, so just
/// continue at it, and send that darn command when you get back up. This is the
/// 'occasionally connected' answer to the decision on whether to publish on publish
/// failed. This means we're storing the send context locally and then publishing it.
/// </summary>
ContinueTryingTillAvailable
}
class Bus
{
Redis redis = new Redis();
OutTransport t = new OutTransport(null);
Scheduler s = new Scheduler();
internal void Publish<T>(object vals,
// this exception policy is only a decider on when to notify the error callback (4th param)
// but most often it shouldn't do very much, because connectivity errors and the like
// should be handled by the transports and their connection handlers
ExceptionPolicy exceptionPolicy,
Action<Cell<ReceiveContext, ISendContext<T>>> ackCallback,
Func<Exception, ISendContext<T>, TransportUnreachableProgrammerDecision> errorCallback)
where T : class
{
var sendContext = new SendContext<T>((T) vals);
// need a message id, this is the thing we track in redis
sendContext.SetMessageId(NewId.Generate().ToString());
// save book keeping data about the send context in case the publish is never ACKed
redis.SyncAppendSet(MTConfig.Endpoint, sendContext);
// perform the actual publish which writes to network buffers, in-proc buffers and so on
t.Publish(sendContext, errorCallback);
// make sure that we remove the send context from the list of outstanding
// contexts from redis when we can be sure we've sent it properly to the broker
// We closure-capture, because the incoming lambda wants just the 'semantic' ACK
// which is not necessarily the actual sending to be ACKed
// By doing it this way we may support ZMQ, event coming in over a SUB socket in the
// case of an event, or a ACK coming in over a PULL socket
ackCallback += _ => redis.SyncRemoveSet(MTConfig.Endpoint, sendContext);
// since we don't know about what RMQ does in the background, but
// we know it notifies us on failure in at least 10 seconds, schedule the removal
// of the "outstanding message" in ten seconds
s.Schedule(() => ackCallback(new Cell<ReceiveContext, ISendContext<T>>(sendContext)), 10.Seconds());
}
// called when we are starting up, no subscriptions at all, but here we may let the programmer
// register a callback on application startup that allows the programmer to handle the outstanding
// messages that were not sent; e.g. by enqueueing them together with a batch of fresh messages
// that was received from the domain service; and in this case we may need to do event merging.
public void PublishContext<T>(ISendContext<T> sendContext)
where T : class
{
// normal send here
t.Publish(sendContext, (e, ctx) => TransportUnreachableProgrammerDecision.ContinueTryingTillAvailable);
}
Management m;
void OnStartUp()
{
// when we start we do the normal stuff
m = new Management(MTConfig.Endpoint);
// such as binding our queue to an exchange
m.Bind();
// then we make sure we have an empty slate, by sending all
// messages that were never sent
m.SendUnacked(this);
}
public MassTransit.UnsubscribeAction SubscribeHandler<T>(Func<T, bool> selector, Action<T> action)
{
// etc...
return () => true;
}
interface IModel
{
event EventHandler TransportError;
// really, it doesn't look like this, but it's similar
void AsyncPublish<T>(ISendContext<T> context) where T : class;
}
class OutTransport
{
readonly IModel _connectionHandler;
readonly List<Func<Exception, TransportUnreachableProgrammerDecision>> errorCallbacks
= new List<Func<Exception, TransportUnreachableProgrammerDecision>>();
public OutTransport(IModel connectionHandler)
{
_connectionHandler = connectionHandler;
_connectionHandler.TransportError += (object sender, EventArgs e) =>
{
lock (errorCallbacks)
{
errorCallbacks.ForEach(cb =>
{
// var error = e.Error;
string error = "";
var decision = cb(new Exception(error));
// enqueue the corresponding action on an actor
// that requeues the send potentially
// or just gives up
});
errorCallbacks.Clear();
}
// somehow correlate all of the
};
}
public void Publish<T>(ISendContext<T> sendContext,
Func<Exception, ISendContext<T>, TransportUnreachableProgrammerDecision> errorCallback)
where T : class
{
errorCallbacks.Add((err) => errorCallback(err, sendContext));
_connectionHandler.AsyncPublish(sendContext);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment