Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
UnboundedStablePriorityMailbox For Akka.Net
/// <summary>
/// Priority queue implemented using a simple list with binary search for inserts.
/// This specific implementation is cheap in terms of memory but weak in terms of performance.
/// See http://visualstudiomagazine.com/articles/2012/11/01/priority-queues-with-c.aspx for original implementation
/// This specific version is adapted for Envelopes only and calculates a priority of envelope.Message
/// </summary>
public sealed class StableListPriorityQueue
{
private struct WrappedEnvelope
{
public WrappedEnvelope(Envelope envelope, int seqnum)
{
Envelope = envelope;
SequenceNumber = seqnum;
}
public Envelope Envelope { get; }
public int SequenceNumber { get; }
}
private class WrappedEnvelopeComparator
{
private readonly Func<object, int> priorityCalculator;
public WrappedEnvelopeComparator(Func<object, int> priorityCalculator)
{
this.priorityCalculator = priorityCalculator;
}
public int Compare(WrappedEnvelope x, WrappedEnvelope y)
{
var baseCompare = priorityCalculator(x.Envelope.Message).CompareTo(priorityCalculator(y.Envelope.Message));
if (baseCompare != 0) return baseCompare;
return x.SequenceNumber.CompareTo(y.SequenceNumber);
}
}
private readonly List<WrappedEnvelope> _data;
private readonly WrappedEnvelopeComparator comparator;
/// <summary>
/// The default priority generator.
/// </summary>
internal static readonly Func<object, int> DefaultPriorityCalculator = message => 1;
private int sequenceNumber;
/// <summary>
/// Creates a new priority queue.
/// </summary>
/// <param name="initialCapacity">The initial capacity of the queue.</param>
/// <param name="priorityCalculator">The calculator function for assigning message priorities.</param>
public StableListPriorityQueue(int initialCapacity, Func<object, int> priorityCalculator)
{
_data = new List<WrappedEnvelope>(initialCapacity);
comparator = new WrappedEnvelopeComparator(priorityCalculator);
}
/// <summary>
/// Enqueues a message into the priority queue.
/// </summary>
/// <param name="item">The item to enqueue.</param>
public void Enqueue(Envelope item)
{
var seq = Interlocked.Increment(ref sequenceNumber);
var wrappedItem = new WrappedEnvelope(item, seq);
_data.Add(wrappedItem);
var ci = _data.Count - 1; // child index; start at end
while (ci > 0)
{
var pi = (ci - 1) / 2; // parent index
if (comparator.Compare(_data[ci], _data[pi]) >= 0) break; // child item is larger than (or equal) parent so we're done
var tmp = _data[ci]; _data[ci] = _data[pi]; _data[pi] = tmp;
ci = pi;
}
}
/// <summary>
/// Dequeues the highest priority message at the front of the priority queue.
/// </summary>
/// <returns>The highest priority message <see cref="Envelope"/>.</returns>
public Envelope Dequeue()
{
// assumes pq is not empty; up to calling code
var li = _data.Count - 1; // last index (before removal)
var frontItem = _data[0]; // fetch the front
_data[0] = _data[li];
_data.RemoveAt(li);
--li; // last index (after removal)
var pi = 0; // parent index. start at front of pq
while (true)
{
var ci = pi * 2 + 1; // left child index of parent
if (ci > li) break; // no children so done
var rc = ci + 1; // right child
if (rc <= li && comparator.Compare(_data[rc], _data[ci]) < 0) // if there is a rc (ci + 1), and it is smaller than left child, use the rc instead
ci = rc;
if (comparator.Compare(_data[pi], _data[ci]) <= 0) break; // parent is smaller than (or equal to) smallest child so done
var tmp = _data[pi]; _data[pi] = _data[ci]; _data[ci] = tmp; // swap parent and child
pi = ci;
}
return frontItem.Envelope;
}
/// <summary>
/// Peek at the message at the front of the priority queue.
/// </summary>
/// <returns>The highest priority message <see cref="Envelope"/>.</returns>
public Envelope Peek()
{
return _data[0].Envelope;
}
/// <summary>
/// Counts the number of items in the priority queue.
/// </summary>
/// <returns>The total number of items in the queue.</returns>
public int Count()
{
return _data.Count;
}
/// <summary>
/// Converts the queue to a string representation.
/// </summary>
/// <returns>A string representation of the queue.</returns>
public override string ToString()
{
var s = "";
for (var i = 0; i < _data.Count; ++i)
s += _data[i].ToString() + " ";
s += "count = " + _data.Count;
return s;
}
/// <summary>
/// TBD
/// </summary>
/// <returns>TBD</returns>
public bool IsConsistent()
{
// is the heap property true for all data?
if (_data.Count == 0) return true;
var li = _data.Count - 1; // last index
for (var pi = 0; pi < _data.Count; ++pi) // each parent index
{
var lci = 2 * pi + 1; // left child index
var rci = 2 * pi + 2; // right child index
if (lci <= li && comparator.Compare(_data[pi],_data[lci]) > 0) return false; // if lc exists and it's greater than parent then bad.
if (rci <= li && comparator.Compare(_data[pi],_data[rci]) > 0) return false; // check the right child too.
}
return true; // passed all checks
}
}
/// <summary>
/// Base class for a message queue that uses a priority generator for messages
/// </summary>
public class UnboundedStablePriorityMessageQueue : BlockingMessageQueue, IUnboundedDequeBasedMessageQueueSemantics
{
private readonly StableListPriorityQueue _prioQueue;
// doesn't need to be threadsafe - only called from within actor
private readonly Stack<Envelope> _prependBuffer = new Stack<Envelope>();
/// <summary>
/// Creates a new unbounded priority message queue.
/// </summary>
/// <param name="priorityGenerator">The calculator function for determining the priority of inbound messages.</param>
/// <param name="initialCapacity">The initial capacity of the queue.</param>
public UnboundedStablePriorityMessageQueue(Func<object, int> priorityGenerator, int initialCapacity)
{
_prioQueue = new StableListPriorityQueue(initialCapacity, priorityGenerator);
}
/// <summary>
/// Unsafe method for computing the underlying message count.
/// </summary>
/// <remarks>
/// Called from within a synchronization mechanism.
/// </remarks>
protected override int LockedCount
{
get { return _prioQueue.Count(); }
}
/// <summary>
/// Unsafe method for enqueuing a new message to the queue.
/// </summary>
/// <param name="envelope">The message to enqueue.</param>
/// <remarks>
/// Called from within a synchronization mechanism.
/// </remarks>
protected override void LockedEnqueue(Envelope envelope)
{
_prioQueue.Enqueue(envelope);
}
/// <summary>
/// Unsafe method for attempting to dequeue a message.
/// </summary>
/// <param name="envelope">The message that might be dequeued.</param>
/// <returns><c>true</c> if a message was available to be dequeued, <c>false</c> otherwise.</returns>
/// <remarks>
/// Called from within a synchronization mechanism.
/// </remarks>
protected override bool LockedTryDequeue(out Envelope envelope)
{
if (_prependBuffer.Count > 0)
{
envelope = _prependBuffer.Pop();
return true;
}
if (_prioQueue.Count() > 0)
{
envelope = _prioQueue.Dequeue();
return true;
}
envelope = default(Envelope);
return false;
}
public void EnqueueFirst(Envelope envelope)
{
_prependBuffer.Push(envelope);
}
}
/// <summary>
/// Priority mailbox - an unbounded mailbox that allows for prioritization of its contents.
/// Extend this class and implement the <see cref="PriorityGenerator"/> method with your own prioritization.
/// The value returned by the <see cref="PriorityGenerator"/> method will be used to order the message in the mailbox.
/// Lower values will be delivered first. Messages ordered by the same number will remain in delivery order.
/// </summary>
public abstract class UnboundedStablePriorityMailbox : MailboxType, IProducesMessageQueue<UnboundedStablePriorityMessageQueue>
{
/// <summary>
/// Function responsible for generating the priority value of a message based on its type and content.
/// </summary>
/// <param name="message">The message to inspect.</param>
/// <returns>An integer. The lower the value, the higher the priority.</returns>
protected abstract int PriorityGenerator(object message);
/// <summary>
/// The initial capacity of the unbounded mailbox.
/// </summary>
public int InitialCapacity { get; }
/// <summary>
/// The default capacity of an unbounded priority mailbox.
/// </summary>
public const int DefaultCapacity = 11;
/// <inheritdoc cref="MailboxType"/>
public sealed override IMessageQueue Create(IActorRef owner, ActorSystem system)
{
return new UnboundedStablePriorityMessageQueue(PriorityGenerator, InitialCapacity);
}
/// <inheritdoc cref="MailboxType"/>
protected UnboundedStablePriorityMailbox(Settings settings, Config config) : base(settings, config)
{
InitialCapacity = DefaultCapacity;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.