Last active
July 5, 2018 14:28
-
-
Save AndreSteenbergen/a2260f23c4e7d2505364f2271440d30d to your computer and use it in GitHub Desktop.
UnboundedStablePriorityMailbox For Akka.Net
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// <summary> | |
/// Priority queue implemented using a simple list with binary search for inserts. | |
/// This specific implementation is cheap in terms of memory but weak in terms of performance. | |
/// See http://visualstudiomagazine.com/articles/2012/11/01/priority-queues-with-c.aspx for original implementation | |
/// This specific version is adapted for Envelopes only and calculates a priority of envelope.Message | |
/// </summary> | |
public sealed class StableListPriorityQueue | |
{ | |
private struct WrappedEnvelope | |
{ | |
public WrappedEnvelope(Envelope envelope, int seqnum) | |
{ | |
Envelope = envelope; | |
SequenceNumber = seqnum; | |
} | |
public Envelope Envelope { get; } | |
public int SequenceNumber { get; } | |
} | |
private class WrappedEnvelopeComparator | |
{ | |
private readonly Func<object, int> priorityCalculator; | |
public WrappedEnvelopeComparator(Func<object, int> priorityCalculator) | |
{ | |
this.priorityCalculator = priorityCalculator; | |
} | |
public int Compare(WrappedEnvelope x, WrappedEnvelope y) | |
{ | |
var baseCompare = priorityCalculator(x.Envelope.Message).CompareTo(priorityCalculator(y.Envelope.Message)); | |
if (baseCompare != 0) return baseCompare; | |
return x.SequenceNumber.CompareTo(y.SequenceNumber); | |
} | |
} | |
private readonly List<WrappedEnvelope> _data; | |
private readonly WrappedEnvelopeComparator comparator; | |
/// <summary> | |
/// The default priority generator. | |
/// </summary> | |
internal static readonly Func<object, int> DefaultPriorityCalculator = message => 1; | |
private int sequenceNumber; | |
/// <summary> | |
/// Creates a new priority queue. | |
/// </summary> | |
/// <param name="initialCapacity">The initial capacity of the queue.</param> | |
/// <param name="priorityCalculator">The calculator function for assigning message priorities.</param> | |
public StableListPriorityQueue(int initialCapacity, Func<object, int> priorityCalculator) | |
{ | |
_data = new List<WrappedEnvelope>(initialCapacity); | |
comparator = new WrappedEnvelopeComparator(priorityCalculator); | |
} | |
/// <summary> | |
/// Enqueues a message into the priority queue. | |
/// </summary> | |
/// <param name="item">The item to enqueue.</param> | |
public void Enqueue(Envelope item) | |
{ | |
var seq = Interlocked.Increment(ref sequenceNumber); | |
var wrappedItem = new WrappedEnvelope(item, seq); | |
_data.Add(wrappedItem); | |
var ci = _data.Count - 1; // child index; start at end | |
while (ci > 0) | |
{ | |
var pi = (ci - 1) / 2; // parent index | |
if (comparator.Compare(_data[ci], _data[pi]) >= 0) break; // child item is larger than (or equal) parent so we're done | |
var tmp = _data[ci]; _data[ci] = _data[pi]; _data[pi] = tmp; | |
ci = pi; | |
} | |
} | |
/// <summary> | |
/// Dequeues the highest priority message at the front of the priority queue. | |
/// </summary> | |
/// <returns>The highest priority message <see cref="Envelope"/>.</returns> | |
public Envelope Dequeue() | |
{ | |
// assumes pq is not empty; up to calling code | |
var li = _data.Count - 1; // last index (before removal) | |
var frontItem = _data[0]; // fetch the front | |
_data[0] = _data[li]; | |
_data.RemoveAt(li); | |
--li; // last index (after removal) | |
var pi = 0; // parent index. start at front of pq | |
while (true) | |
{ | |
var ci = pi * 2 + 1; // left child index of parent | |
if (ci > li) break; // no children so done | |
var rc = ci + 1; // right child | |
if (rc <= li && comparator.Compare(_data[rc], _data[ci]) < 0) // if there is a rc (ci + 1), and it is smaller than left child, use the rc instead | |
ci = rc; | |
if (comparator.Compare(_data[pi], _data[ci]) <= 0) break; // parent is smaller than (or equal to) smallest child so done | |
var tmp = _data[pi]; _data[pi] = _data[ci]; _data[ci] = tmp; // swap parent and child | |
pi = ci; | |
} | |
return frontItem.Envelope; | |
} | |
/// <summary> | |
/// Peek at the message at the front of the priority queue. | |
/// </summary> | |
/// <returns>The highest priority message <see cref="Envelope"/>.</returns> | |
public Envelope Peek() | |
{ | |
return _data[0].Envelope; | |
} | |
/// <summary> | |
/// Counts the number of items in the priority queue. | |
/// </summary> | |
/// <returns>The total number of items in the queue.</returns> | |
public int Count() | |
{ | |
return _data.Count; | |
} | |
/// <summary> | |
/// Converts the queue to a string representation. | |
/// </summary> | |
/// <returns>A string representation of the queue.</returns> | |
public override string ToString() | |
{ | |
var s = ""; | |
for (var i = 0; i < _data.Count; ++i) | |
s += _data[i].ToString() + " "; | |
s += "count = " + _data.Count; | |
return s; | |
} | |
/// <summary> | |
/// TBD | |
/// </summary> | |
/// <returns>TBD</returns> | |
public bool IsConsistent() | |
{ | |
// is the heap property true for all data? | |
if (_data.Count == 0) return true; | |
var li = _data.Count - 1; // last index | |
for (var pi = 0; pi < _data.Count; ++pi) // each parent index | |
{ | |
var lci = 2 * pi + 1; // left child index | |
var rci = 2 * pi + 2; // right child index | |
if (lci <= li && comparator.Compare(_data[pi],_data[lci]) > 0) return false; // if lc exists and it's greater than parent then bad. | |
if (rci <= li && comparator.Compare(_data[pi],_data[rci]) > 0) return false; // check the right child too. | |
} | |
return true; // passed all checks | |
} | |
} | |
/// <summary> | |
/// Base class for a message queue that uses a priority generator for messages | |
/// </summary> | |
public class UnboundedStablePriorityMessageQueue : BlockingMessageQueue, IUnboundedDequeBasedMessageQueueSemantics | |
{ | |
private readonly StableListPriorityQueue _prioQueue; | |
// doesn't need to be threadsafe - only called from within actor | |
private readonly Stack<Envelope> _prependBuffer = new Stack<Envelope>(); | |
/// <summary> | |
/// Creates a new unbounded priority message queue. | |
/// </summary> | |
/// <param name="priorityGenerator">The calculator function for determining the priority of inbound messages.</param> | |
/// <param name="initialCapacity">The initial capacity of the queue.</param> | |
public UnboundedStablePriorityMessageQueue(Func<object, int> priorityGenerator, int initialCapacity) | |
{ | |
_prioQueue = new StableListPriorityQueue(initialCapacity, priorityGenerator); | |
} | |
/// <summary> | |
/// Unsafe method for computing the underlying message count. | |
/// </summary> | |
/// <remarks> | |
/// Called from within a synchronization mechanism. | |
/// </remarks> | |
protected override int LockedCount | |
{ | |
get { return _prioQueue.Count(); } | |
} | |
/// <summary> | |
/// Unsafe method for enqueuing a new message to the queue. | |
/// </summary> | |
/// <param name="envelope">The message to enqueue.</param> | |
/// <remarks> | |
/// Called from within a synchronization mechanism. | |
/// </remarks> | |
protected override void LockedEnqueue(Envelope envelope) | |
{ | |
_prioQueue.Enqueue(envelope); | |
} | |
/// <summary> | |
/// Unsafe method for attempting to dequeue a message. | |
/// </summary> | |
/// <param name="envelope">The message that might be dequeued.</param> | |
/// <returns><c>true</c> if a message was available to be dequeued, <c>false</c> otherwise.</returns> | |
/// <remarks> | |
/// Called from within a synchronization mechanism. | |
/// </remarks> | |
protected override bool LockedTryDequeue(out Envelope envelope) | |
{ | |
if (_prependBuffer.Count > 0) | |
{ | |
envelope = _prependBuffer.Pop(); | |
return true; | |
} | |
if (_prioQueue.Count() > 0) | |
{ | |
envelope = _prioQueue.Dequeue(); | |
return true; | |
} | |
envelope = default(Envelope); | |
return false; | |
} | |
public void EnqueueFirst(Envelope envelope) | |
{ | |
_prependBuffer.Push(envelope); | |
} | |
} | |
/// <summary> | |
/// Priority mailbox - an unbounded mailbox that allows for prioritization of its contents. | |
/// Extend this class and implement the <see cref="PriorityGenerator"/> method with your own prioritization. | |
/// The value returned by the <see cref="PriorityGenerator"/> method will be used to order the message in the mailbox. | |
/// Lower values will be delivered first. Messages ordered by the same number will remain in delivery order. | |
/// </summary> | |
public abstract class UnboundedStablePriorityMailbox : MailboxType, IProducesMessageQueue<UnboundedStablePriorityMessageQueue> | |
{ | |
/// <summary> | |
/// Function responsible for generating the priority value of a message based on its type and content. | |
/// </summary> | |
/// <param name="message">The message to inspect.</param> | |
/// <returns>An integer. The lower the value, the higher the priority.</returns> | |
protected abstract int PriorityGenerator(object message); | |
/// <summary> | |
/// The initial capacity of the unbounded mailbox. | |
/// </summary> | |
public int InitialCapacity { get; } | |
/// <summary> | |
/// The default capacity of an unbounded priority mailbox. | |
/// </summary> | |
public const int DefaultCapacity = 11; | |
/// <inheritdoc cref="MailboxType"/> | |
public sealed override IMessageQueue Create(IActorRef owner, ActorSystem system) | |
{ | |
return new UnboundedStablePriorityMessageQueue(PriorityGenerator, InitialCapacity); | |
} | |
/// <inheritdoc cref="MailboxType"/> | |
protected UnboundedStablePriorityMailbox(Settings settings, Config config) : base(settings, config) | |
{ | |
InitialCapacity = DefaultCapacity; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment