Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
ConcurrentLinkedList<T>
#pragma warning disable 420
namespace System.Collections.Concurrent
{
using Azure.Framework;
using Diagnostics;
using Diagnostics.Contracts;
using Generic;
using Runtime.InteropServices;
using Runtime.Serialization;
using Security.Permissions;
using Threading;
/// <summary>
/// A thread-safe doubly-linked list
/// </summary>
/// <typeparam name = "T"></typeparam>
[Serializable, DebuggerTypeProxy(typeof (IProducerConsumerCollection_DebugView<>)),
DebuggerDisplay("Count = {Count}"), ComVisible(false),
HostProtection(SecurityAction.LinkDemand, Synchronization = true, ExternalThreading = true)]
public class ConcurrentLinkedList<T> : IProducerConsumerCollection<T>, ICollection<T>, IEnumerable<T>, ICollection,
IEnumerable
{
#region Fields
[NonSerialized]
private ConcurrentLinkedListNode<T> _head;
/// <summary>
/// <see cref = "ReaderWriterLockSlim" /> for <see cref = "_head" />
/// </summary>
private ReaderWriterLockSlim _headLock;
[NonSerialized]
private volatile int _lastCount;
private T[] _serializationArray;
[NonSerialized]
private volatile int _version;
[NonSerialized]
private volatile int _versionAtLastCount;
#endregion
#region Constructors
/// <summary>
/// Initializes a new instance of the <see cref = "ConcurrentLinkedList&lt;T&gt;" /> class.
/// </summary>
public ConcurrentLinkedList()
{
Initialize(null);
}
/// <summary>
/// Initializes a new instance of the <see cref = "ConcurrentLinkedList&lt;T&gt;" /> class.
/// </summary>
/// <param name = "collection">The initial items with which to populate the list.</param>
public ConcurrentLinkedList(IEnumerable<T> collection)
{
Contract.Requires<ArgumentNullException>(collection != null, "collection");
Initialize(collection);
}
#endregion
#region Properties
/// <summary>
/// Gets a value that indicates whether the <see cref = "ConcurrentLinkedList{T}" /> is empty.
/// </summary>
/// <returns><see langword = "true" /> if the <see cref = "ConcurrentLinkedList{T}" /> is empty; otherwise <see langword = "false" />.</returns>
public bool IsEmpty
{
get { return First == null; }
}
/// <summary>
/// Gets the first node of the <see cref = "ConcurrentLinkedList{T}" />.
/// </summary>
/// <returns>The first <see cref = "ConcurrentLinkedListNode{T}" /> of the <see cref = "ConcurrentLinkedList{T}" />.</returns>
public ConcurrentLinkedListNode<T> First
{
get
{
var lockTaken = false;
try
{
_headLock.EnsureLockedForReading(ref lockTaken);
return _head;
}
finally
{
if (lockTaken)
{
_headLock.UnlockAfterReading();
}
}
}
}
/// <summary>
/// Gets the last node of the <see cref = "ConcurrentLinkedList{T}" />.
/// </summary>
/// <returns>The last <see cref = "ConcurrentLinkedListNode{T}" /> of the <see cref = "ConcurrentLinkedList{T}" />.</returns>
public ConcurrentLinkedListNode<T> Last
{
get
{
var headLocked = false;
try
{
_headLock.EnsureLockedForReading(ref headLocked);
var head = _head;
if (head != null)
{
var positionLocked = false;
try
{
head.PositionLock.EnsureLockedForReading(ref positionLocked);
return head.PreviousNode;
}
finally
{
if (positionLocked)
{
head.PositionLock.UnlockAfterReading();
}
}
}
return null;
}
finally
{
if (headLocked)
_headLock.UnlockAfterReading();
}
}
}
#endregion
#region Methods
private void AcquireAllLocks(bool upgradeable = false, bool lockValues = false)
{
Contract.Assert(_headLock != null && _headLock.IsReadLockHeld,
"A reader-lock must be obtained before reading the head");
if (_head == null) return;
var node = _head;
var lockTaken = false;
do
{
node.PositionLock.EnsureLockedForReading(ref lockTaken, upgradeable);
if (lockValues)
{
lockTaken = false;
node.ValueLock.EnsureLockedForReading(ref lockTaken);
// Not upgradeable since we shouldn't change values.
}
node = node.NextNode;
} while (node != _head);
}
/// <summary>
/// Adds a new node containing the specified value after the specified existing node in the <see cref = "ConcurrentLinkedList{T}" />.
/// </summary>
/// <returns>The new <see cref = "ConcurrentLinkedListNode{T}" /> containing <paramref name = "value" />.</returns>
/// <param name = "node">The <see cref = "ConcurrentLinkedListNode{T}" /> after which to insert a new <see cref = "ConcurrentLinkedListNode{T}" /> containing <paramref name = "value" />.</param>
/// <param name = "value">The value to add to the <see cref = "ConcurrentLinkedList{T}" />.</param>
/// <exception cref = "T:System.ArgumentNullException">
/// <paramref name = "node" /> is null.</exception>
/// <exception cref = "T:System.InvalidOperationException">
/// <paramref name = "node" /> is not in the current <see cref = "ConcurrentLinkedList{T}" />.</exception>
public ConcurrentLinkedListNode<T> AddAfter(ConcurrentLinkedListNode<T> node, T value)
{
Contract.Requires<ArgumentNullException>(node != null, "node");
var newNode = new ConcurrentLinkedListNode<T>(value);
AddAfter(node, newNode);
return newNode;
}
/// <summary>
/// Adds the specified new node after the specified existing node in the <see cref = "T:System.Collections.Generic.ConcurrentLinkedList`1" />.
/// </summary>
/// <param name = "node">The <see cref = "T:System.Collections.Generic.ConcurrentLinkedListNode`1" /> after which to insert <paramref name = "newNode" />.</param>
/// <param name = "newNode">The new <see cref = "T:System.Collections.Generic.ConcurrentLinkedListNode`1" /> to add to the <see cref = "T:System.Collections.Generic.ConcurrentLinkedList`1" />.</param>
/// <exception cref = "T:System.ArgumentNullException">
/// <paramref name = "node" /> is null.-or-<paramref name = "newNode" /> is null.</exception>
/// <exception cref = "T:System.InvalidOperationException">
/// <paramref name = "node" /> is not in the current <see cref = "T:System.Collections.Generic.ConcurrentLinkedList`1" />.-or-<paramref name = "newNode" /> belongs to another <see cref = "T:System.Collections.Generic.ConcurrentLinkedList`1" />.</exception>
public void AddAfter(ConcurrentLinkedListNode<T> node, ConcurrentLinkedListNode<T> newNode)
{
Contract.Requires<ArgumentNullException>(node != null, "node");
Contract.Requires<ArgumentNullException>(newNode != null, "newNode");
var positionLocked = false;
try
{
node.PositionLock.EnsureLockedForReading(ref positionLocked, upgradeable: true);
InsertNodeBefore(node.NextNode, newNode);
}
finally
{
if (positionLocked)
node.PositionLock.UnlockAfterReading();
}
}
/// <summary>
/// Adds a new node containing the specified value before the specified existing node in the <see cref = "ConcurrentLinkedList{T}" />.
/// </summary>
/// <param name = "node">The <see cref = "ConcurrentLinkedListNode{T}" /> before which to insert a new <see cref = "ConcurrentLinkedListNode{T}" /> containing <paramref name = "value" />.</param>
/// <param name = "value">The value to add to the <see cref = "ConcurrentLinkedList{T}" />.</param>
/// <returns>
/// The new <see cref = "ConcurrentLinkedListNode{T}" /> containing <paramref name = "value" />.
/// </returns>
/// <exception cref = "ArgumentNullException">
/// <paramref name = "node" /> is null.</exception>
/// <exception cref = "InvalidOperationException">
/// <paramref name = "node" /> is not in the current <see cref = "ConcurrentLinkedList{T}" />.
/// </exception>
public ConcurrentLinkedListNode<T> AddBefore(ConcurrentLinkedListNode<T> node, T value)
{
Contract.Requires<ArgumentNullException>(node != null, "node");
var newNode = new ConcurrentLinkedListNode<T>(value);
InsertNodeBefore(node, newNode);
return newNode;
}
/// <summary>
/// Adds the specified new node before the specified existing node in the <see cref = "ConcurrentLinkedList{T}" />.
/// </summary>
/// <param name = "node">The <see cref = "ConcurrentLinkedListNode{T}" /> before which to insert <paramref name = "newNode" />.</param>
/// <param name = "newNode">The new <see cref = "ConcurrentLinkedListNode{T}" /> to add to the <see cref = "ConcurrentLinkedList{T}" />.</param>
/// <exception cref = "InvalidOperationException">
/// <paramref name = "node" /> is not in the current <see cref = "ConcurrentLinkedList{T}" />.-or-<paramref name = "newNode" /> belongs to another <see cref = "ConcurrentLinkedList{T}" />.
/// </exception>
public void AddBefore(ConcurrentLinkedListNode<T> node, ConcurrentLinkedListNode<T> newNode)
{
Contract.Requires<ArgumentNullException>(node != null, "node");
Contract.Requires<ArgumentNullException>(newNode != null, "newNode");
InsertNodeBefore(node, newNode);
}
/// <summary>
/// Adds a new node containing the specified value at the start of the <see cref = "ConcurrentLinkedList{T}" />.
/// </summary>
/// <returns>The new <see cref = "ConcurrentLinkedListNode{T}" /> containing <paramref name = "value" />.</returns>
/// <param name = "value">The value to add at the start of the <see cref = "ConcurrentLinkedList{T}" />.</param>
public ConcurrentLinkedListNode<T> AddFirst(T value)
{
var newNode = new ConcurrentLinkedListNode<T>(value);
AddFirst(newNode);
return newNode;
}
/// <summary>
/// Adds the specified new node at the start of the <see cref = "ConcurrentLinkedList{T}" />.
/// </summary>
/// <param name = "node">The new <see cref = "ConcurrentLinkedListNode{T}" /> to add at the start of the <see cref = "ConcurrentLinkedList{T}" />.</param>
/// <exception cref = "T:System.ArgumentNullException">
/// <paramref name = "node" /> is null.</exception>
/// <exception cref = "T:System.InvalidOperationException">
/// <paramref name = "node" /> belongs to another <see cref = "ConcurrentLinkedList{T}" />.</exception>
public void AddFirst(ConcurrentLinkedListNode<T> node)
{
Contract.Requires<ArgumentNullException>(node != null, "node");
Contract.Requires<InvalidOperationException>(node.List == null,
"'node' already belongs to another ConcurrentLinkedList.");
Contract.Ensures(First == node);
var headLocked = false;
try
{
_headLock.EnsureLockedForReading(ref headLocked, true);
if (_head == null)
InsertNodeToEmptyList(node);
else
InsertNodeBefore(_head, node);
}
finally
{
if (headLocked)
{
_headLock.UnlockAfterReading();
}
}
}
/// <summary>
/// Adds a new node containing the specified value at the end of the <see cref = "ConcurrentLinkedList{T}" />.
/// </summary>
/// <returns>
/// The new <see cref = "ConcurrentLinkedListNode{T}" /> containing <paramref name = "value" />.
/// </returns>
/// <param name = "value">
/// The value to add at the end of the <see cref = "ConcurrentLinkedList{T}" />.
/// </param>
public ConcurrentLinkedListNode<T> AddLast(T value)
{
var newNode = new ConcurrentLinkedListNode<T>(value);
AddLast(newNode);
return newNode;
}
/// <summary>
/// Adds the specified new node at the end of the <see cref = "ConcurrentLinkedList{T}" />.
/// </summary>
/// <param name = "node">The new <see cref = "ConcurrentLinkedListNode{T}" /> to add at the end of the <see cref = "ConcurrentLinkedList{T}" />.</param>
/// <exception cref = "T:System.ArgumentNullException">
/// <paramref name = "node" /> is null.</exception>
/// <exception cref = "T:System.InvalidOperationException">
/// <paramref name = "node" /> belongs to another <see cref = "ConcurrentLinkedList{T}" />.</exception>
public void AddLast(ConcurrentLinkedListNode<T> node)
{
Contract.Requires<ArgumentNullException>(node != null, "node");
Contract.Requires<InvalidOperationException>(node.List == null,
"'node' already belongs to another ConcurrentLinkedList.");
Contract.Ensures(Last == node);
var headLocked = false;
try
{
_headLock.EnsureLockedForReading(ref headLocked, true);
if (_head == null)
InsertNodeToEmptyList(node);
else
InsertNodeBefore(_head, node, replaceHead: false);
}
finally
{
if (headLocked)
{
_headLock.UnlockAfterReading();
}
}
}
/// <summary>
/// Finds the first node that contains the specified value.
/// </summary>
/// <returns>The first <see cref = "ConcurrentLinkedListNode{T}" /> that contains the specified value, if found; otherwise, null.</returns>
/// <param name = "value">The value to locate in the <see cref = "ConcurrentLinkedList{T}" />.</param>
public ConcurrentLinkedListNode<T> Find(T value)
{
var lockTaken = false;
try
{
FreezeList(ref lockTaken, lockValues: true);
return FindNodeWithValue(value);
}
finally
{
UnfreezeList(lockTaken);
}
}
private ConcurrentLinkedListNode<T> FindNodeWithValue(T value, bool searchFromFirstToLast = true)
{
Contract.Assert(_headLock.IsReadLockHeld);
var head = _head;
if (head != null)
{
Contract.Assert(head.PositionLock.IsReadLockHeld);
var startNode = searchFromFirstToLast ? head : head.PreviousNode;
Func<ConcurrentLinkedListNode<T>, ConcurrentLinkedListNode<T>> getNext =
n => searchFromFirstToLast ? n.NextNode : n.PreviousNode;
var equalityComparer = EqualityComparer<T>.Default;
var node = startNode;
do
{
if (equalityComparer.Equals(node.Value, value))
return node;
if (node != head)
Contract.Assert(node.PositionLock.IsReadLockHeld);
node = getNext(node);
} while (node != startNode);
}
return null;
}
/// <summary>
/// Finds the last node that contains the specified value.
/// </summary>
/// <returns>The last <see cref = "ConcurrentLinkedListNode{T}" /> that contains the specified value, if found; otherwise, null.</returns>
/// <param name = "value">The value to locate in the <see cref = "ConcurrentLinkedList{T}" />.</param>
public ConcurrentLinkedListNode<T> FindLast(T value)
{
var lockTaken = false;
try
{
FreezeList(ref lockTaken, lockValues: true);
return FindNodeWithValue(value, searchFromFirstToLast: false);
}
finally
{
UnfreezeList(lockTaken);
}
}
private void FreezeList(ref bool lockTaken, bool upgradeable = false, bool lockValues = false)
{
Contract.Ensures(_headLock.IsReadLockHeld);
_headLock.EnsureLockedForReading(ref lockTaken, upgradeable);
AcquireAllLocks(upgradeable, lockValues);
WaitAllOperations();
}
private void Initialize(IEnumerable<T> collection)
{
Contract.Ensures(_headLock != null);
new SpinLock(true);
Contract.Assert(_headLock == null);
_headLock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
if (collection != null)
{
foreach (var item in collection)
AddLast(item);
}
}
internal void InsertNodeBefore(ConcurrentLinkedListNode<T> node,
ConcurrentLinkedListNode<T> newNode,
bool replaceHead = true)
{
Contract.Requires<ArgumentNullException>(node != null, "node");
Contract.Requires<ArgumentNullException>(newNode != null, "newNode");
Contract.Ensures(newNode.NextNode == node);
Contract.Ensures(newNode.PreviousNode == Contract.OldValue(node.PreviousNode));
Contract.Ensures(node.PreviousNode == newNode);
Contract.Ensures(Contract.OldValue(node.PreviousNode.NextNode) == newNode);
UpdateNode(node, NodeOperation.Move,
succeedingNode =>
{
var precedingNode = succeedingNode.PreviousNode;
// Set the precedingNode to point to newNode as next;
UpdateNode(precedingNode, NodeOperation.Move, x => x.Next = newNode);
UpdateNode(newNode, NodeOperation.Add, x =>
{
var lockTaken = false;
try
{
x.PositionLock.EnsureLockedForWriting(
ref lockTaken);
// Set the newNode to point to succeedingNode and precedingNode as next and previous respectively.
x.NextNode = succeedingNode;
x.PreviousNode = precedingNode;
}
finally
{
if (lockTaken)
{
x.PositionLock.UnlockAfterWriting();
}
}
});
// Finally set succeedingNode to point to newNode as previous
succeedingNode.Previous = newNode;
// If succeedingNode is the current head, replace it with newNode.
if (replaceHead)
SetHead(head => head == succeedingNode, newNode);
});
}
internal void InsertNodeToEmptyList(ConcurrentLinkedListNode<T> newNode)
{
Contract.Requires<ArgumentNullException>(newNode != null, "newNode");
UpdateNode(newNode, NodeOperation.Add,
node => SetHead(head => head == null, node,
onSuccess: () =>
{
var nodeLocked = false;
try
{
node.PositionLock.EnsureLockedForWriting(
ref nodeLocked);
// Set the node to point to itself as previous and next since it is the only node in the list.
node.PreviousNode = node.NextNode = node;
}
finally
{
if (nodeLocked)
{
node.PositionLock.UnlockAfterWriting();
}
}
},
onFailure: () =>
{
#if DEBUG
// This shouldn't really occur since we only call this method when the head is locked and null.
if (Debugger.IsAttached)
Debugger.Break();
#endif
// We've been beaten to the chase. Insert it before the current head.
Contract.Assert(_head != null &&
_head.PositionLock.IsUpgradeableReadLockHeld);
InsertNodeBefore(_head, node, replaceHead: false);
//NOTE: Should we replace the head here?
}));
}
[ContractInvariantMethod]
private void ObjectInvariant()
{
Contract.Invariant(_headLock != null);
}
[OnDeserialized]
private void OnDeserialized(StreamingContext context)
{
Contract.Ensures(_headLock != null);
Contract.Ensures(_serializationArray == null);
Initialize(_serializationArray);
_serializationArray = null;
}
[OnSerializing]
private void OnSerializing(StreamingContext context)
{
Contract.Ensures(_serializationArray != null);
_serializationArray = ToArray();
}
private void ReleaseAllLocks()
{
Contract.Assert(_headLock != null && _headLock.IsReadLockHeld);
if (_head == null) return;
var next = _head;
do
{
var current = next;
Contract.Assert(current.PositionLock.IsReadLockHeld || current.PositionLock.IsWriteLockHeld);
next = current.NextNode;
current.PositionLock.UnlockAfterReading();
current.ValueLock.UnlockAfterReading();
} while (next != _head);
}
/// <summary>
/// Removes the specified node from the <see cref = "ConcurrentLinkedList{T}" />.
/// </summary>
/// <param name = "node">The <see cref = "ConcurrentLinkedListNode{T}" /> to remove from the <see cref = "ConcurrentLinkedList{T}" />.</param>
/// <exception cref = "T:System.ArgumentNullException">
/// <paramref name = "node" /> is null.</exception>
/// <exception cref = "T:System.InvalidOperationException">
/// <paramref name = "node" /> is not in the current <see cref = "ConcurrentLinkedList{T}" />.</exception>
public void Remove(ConcurrentLinkedListNode<T> node)
{
Contract.Requires<ArgumentNullException>(node != null, "node");
RemoveNode(node);
}
/// <summary>
/// Removes the node at the start of the <see cref = "ConcurrentLinkedList{T}" />.
/// </summary>
/// <exception cref = "T:System.InvalidOperationException">The <see cref = "ConcurrentLinkedList{T}" /> is empty.</exception>
public void RemoveFirst()
{
Contract.Requires<InvalidOperationException>(First != null, "The ConcurrentLinkedList is empty.");
var headLocked = false;
try
{
_headLock.EnsureLockedForReading(ref headLocked, true);
var head = _head;
if (head != null)
RemoveNode(head);
}
finally
{
if (headLocked)
_headLock.UnlockAfterReading();
}
}
/// <summary>
/// Removes the node at the end of the <see cref = "ConcurrentLinkedList{T}" />.
/// </summary>
/// <exception cref = "T:System.InvalidOperationException">The <see cref = "ConcurrentLinkedList{T}" /> is empty.</exception>
public void RemoveLast()
{
Contract.Requires<InvalidOperationException>(Last != null, "The ConcurrentLinkedList is empty.");
var headLocked = false;
try
{
_headLock.EnsureLockedForReading(ref headLocked, true);
var head = _head;
if (head != null)
{
var positionLocked = false;
try
{
head.PositionLock.EnsureLockedForReading(ref positionLocked, upgradeable:true);
RemoveNode(head.PreviousNode);
}
finally
{
if(positionLocked)
head.PositionLock.UnlockAfterReading();
}
}
}
finally
{
if (headLocked)
_headLock.UnlockAfterReading();
}
}
private void RemoveNode(ConcurrentLinkedListNode<T> node)
{
if (node == null) return;
UpdateNode(node, NodeOperation.Remove,
target =>
{
if (target.Owner == null)
return;
var precedingNode = target.PreviousNode;
var succeedingNode = target.NextNode;
target.Invalidate();
if (succeedingNode == target || precedingNode == target)
{
// This is only true if node is the head and is the only node in the list.
// So removing it effectively sets _head to null.
SetHead(head => head == target, null);
}
else
{
// Update the preceding and succeeding nodes to point to each other as next and previous respectively.
UpdateNode(precedingNode, NodeOperation.Move, n => n.Next = succeedingNode);
UpdateNode(succeedingNode, NodeOperation.Move, n => n.Previous = precedingNode);
// If 'target' is the current head, switch the head to the next node
SetHead(head => head == target, succeedingNode);
}
});
}
private void SetHead(Func<ConcurrentLinkedListNode<T>, bool> condition,
ConcurrentLinkedListNode<T> newHead,
Action onSuccess = null,
Action onFailure = null)
{
var readLocked = false;
try
{
_headLock.EnsureLockedForReading(ref readLocked, true);
if (condition(_head))
{
var writeLocked = false;
try
{
_headLock.UpgradeToWriteLock(ref writeLocked);
if (condition(_head))
{
_head = newHead;
if (onSuccess != null)
onSuccess();
}
else
{
if (onFailure != null)
onFailure();
}
}
finally
{
if (writeLocked)
_headLock.ExitWriteLock();
}
}
}
finally
{
if (readLocked)
{
_headLock.UnlockAfterReading();
}
}
}
private List<T> ToList()
{
var list = new List<T>();
var lockTaken = false;
try
{
FreezeList(ref lockTaken, lockValues:true);
if (_head != null)
{
var node = _head;
do
{
list.Add(node.Value);
node = node.NextNode;
} while (node != _head);
}
return list;
}
finally
{
UnfreezeList(lockTaken);
}
}
public bool TryPeek(out T result)
{
return TryTakeOrPeek(out result, false);
}
private bool TryTakeOrPeek(out T result, bool take)
{
result = default(T);
var headLocked = false;
try
{
_headLock.EnsureLockedForReading(ref headLocked, take);
var head = _head;
if (head != null)
{
var positionLocked = false;
try
{
head.PositionLock.EnsureLockedForReading(ref positionLocked, take);
var last = head.PreviousNode;
result = last.Value;
if (take)
RemoveNode(last);
return true;
}
finally
{
if(positionLocked)
head.PositionLock.UnlockAfterReading();
}
}
return false;
}
finally
{
if (headLocked)
{
_headLock.UnlockAfterReading();
}
}
}
private void UnfreezeList(bool lockTaken)
{
ReleaseAllLocks();
if (lockTaken)
_headLock.UnlockAfterReading();
}
private void UpdateNode(ConcurrentLinkedListNode<T> node,
NodeOperation operation,
Action<ConcurrentLinkedListNode<T>> action)
{
Contract.Requires(node != null);
Contract.Requires(action != null);
Contract.Requires(operation != NodeOperation.None);
var readLocked = false;
var writeLocked = false;
try
{
node.PositionLock.EnsureLockedForReading(ref readLocked, upgradeable: true);
var owner = node.Owner;
switch (operation)
{
case NodeOperation.Remove:
case NodeOperation.Move:
Guard.Verify(owner == this,
"'node' does not belong to the current ConcurrentLinkedList.");
break;
case NodeOperation.Add:
Guard.Verify(owner == null || owner == this,
"'node' already belongs to another ConcurrentLinkedList instance.");
node.PositionLock.EnsureLockedForWriting(ref writeLocked);
// Set the owner.
node.Owner = this;
break;
}
if (node.CurrentOperation != (int) NodeOperation.None)
{
var spinWait = new SpinWait();
while (node.CurrentOperation != (int) NodeOperation.None)
{
spinWait.SpinOnce();
}
}
Interlocked.Exchange(ref node.CurrentOperation, (int) operation);
action(node);
if (operation == NodeOperation.Add || operation == NodeOperation.Remove)
Interlocked.Increment(ref _version);
}
finally
{
Interlocked.Exchange(ref node.CurrentOperation, (int) NodeOperation.None);
if (writeLocked)
node.PositionLock.UnlockAfterWriting();
if (readLocked)
node.PositionLock.UnlockAfterReading();
}
}
private void WaitAllOperations()
{
Contract.Requires(_headLock.IsReadLockHeld);
if (_head == null)
return;
// No need to lock the head since this method is only called from the FreezeList operation, meaning it's already locked.
var node = _head;
do
{
if (node.CurrentOperation != (int) NodeOperation.None)
{
var wait = new SpinWait();
while (node.CurrentOperation != (int) NodeOperation.None)
{
wait.SpinOnce();
}
}
node = node.Next;
} while (node != _head);
}
#endregion
#region ICollection<T> Members
/// <summary>
/// Removes all nodes from the <see cref = "ConcurretLinkedList{T}" />.
/// </summary>
public void Clear()
{
var lockTaken = false;
try
{
FreezeList(ref lockTaken, upgradeable:true);
var next = _head;
while (next != null)
{
var node = next;
next = next.NextNode;
node.Invalidate();
}
SetHead(head => head != null, null);
_lastCount = 0;
_version++;
}
finally
{
UnfreezeList(lockTaken);
}
}
/// <summary>
/// Removes the first occurrence of the specified value from the <see cref = "ConcurrentLinkedList{T}" />.
/// </summary>
/// <param name = "value">The value to remove from the <see cref = "ConcurrentLinkedList{T}" />.</param>
/// <returns>
/// <see langword = "true" /> if the element containing <paramref name = "value" /> is successfully removed; otherwise, <see langword = "false" />.
/// This method also returns false if <paramref name = "value" /> was not found in the original <see cref = "ConcurrentLinkedList{T}" />.
/// </returns>
public bool Remove(T value)
{
var lockTaken = false;
try
{
FreezeList(ref lockTaken, upgradeable:true, lockValues:true);
var node = FindNodeWithValue(value);
if (node != null)
{
RemoveNode(node);
return true;
}
return false;
}
finally
{
UnfreezeList(lockTaken);
}
}
/// <summary>
/// Adds an item to the <see cref = "T:System.Collections.Generic.ICollection`1" />.
/// </summary>
/// <param name = "item">The object to add to the <see cref = "T:System.Collections.Generic.ICollection`1" />.</param>
/// <exception cref = "T:System.NotSupportedException">The <see cref = "T:System.Collections.Generic.ICollection`1" /> is read-only.</exception>
void ICollection<T>.Add(T item)
{
AddLast(item);
}
/// <summary>
/// Determines whether the <see cref = "T:System.Collections.Generic.ICollection`1" /> contains a specific value.
/// </summary>
/// <returns>
/// true if <paramref name = "item" /> is found in the <see cref = "T:System.Collections.Generic.ICollection`1" />; otherwise, false.
/// </returns>
/// <param name = "item">The object to locate in the <see cref = "T:System.Collections.Generic.ICollection`1" />.</param>
public bool Contains(T item)
{
return Find(item) != null;
}
/// <summary>
/// Gets a value indicating whether the <see cref = "T:System.Collections.Generic.ICollection`1" /> is read-only.
/// </summary>
/// <returns>
/// true if the <see cref = "T:System.Collections.Generic.ICollection`1" /> is read-only; otherwise, false.
/// </returns>
bool ICollection<T>.IsReadOnly
{
get { return false; }
}
#endregion
#region IProducerConsumerCollection<T> Members
public void CopyTo(T[] array, int index)
{
Contract.Requires<ArgumentNullException>(array != null, "array");
Contract.Requires<ArgumentOutOfRangeException>(index >= 0, "index");
ToList().CopyTo(array, index);
}
public IEnumerator<T> GetEnumerator()
{
return ((IEnumerable<T>) ToArray()).GetEnumerator();
}
/// <summary>
/// Attempts to add an object to the <see cref = "T:System.Collections.Concurrent.IProducerConsumerCollection`1" />.
/// </summary>
/// <returns>
/// true if the object was added successfully; otherwise, false.
/// </returns>
/// <param name = "item">The object to add to the <see cref = "T:System.Collections.Concurrent.IProducerConsumerCollection`1" />.</param>
/// <exception cref = "T:System.ArgumentException">The <paramref name = "item" /> was invalid for this collection.</exception>
bool IProducerConsumerCollection<T>.TryAdd(T item)
{
AddLast(item);
return true;
}
void ICollection.CopyTo(Array array, int index)
{
Contract.Requires<ArgumentNullException>(array != null, "array");
Contract.Requires<ArgumentOutOfRangeException>(index >= 0, "index");
ICollection list = ToList();
list.CopyTo(array, index);
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
public T[] ToArray()
{
return ToList().ToArray();
}
/// <summary>
/// Attempts to remove and return an object from the <see cref = "T:System.Collections.Concurrent.IProducerConsumerCollection`1" />.
/// </summary>
/// <returns>
/// true if an object was removed and returned successfully; otherwise, false.
/// </returns>
/// <param name = "item">When this method returns, if the object was removed and returned successfully, <paramref name = "item" /> contains the removed object. If no object was available to be removed, the value is unspecified.</param>
public bool TryTake(out T result)
{
return TryTakeOrPeek(out result, true);
}
public int Count
{
get
{
var lockTaken = false;
try
{
FreezeList(ref lockTaken);
var currentCount = 0;
if (_version == _versionAtLastCount)
// Nothing has changed. Use the last count;
currentCount = _lastCount;
else
{
if (_head != null)
{
var node = _head;
var count = 0;
do
{
count++;
node = node.NextNode;
} while (node != _head);
currentCount = count;
}
_lastCount = currentCount;
_versionAtLastCount = _version;
}
return currentCount;
}
finally
{
UnfreezeList(lockTaken);
}
}
}
bool ICollection.IsSynchronized
{
get { return false; }
}
object ICollection.SyncRoot
{
get
{
throw new NotSupportedException(
"The SyncRoot property may not be used for the synchronization of concurrent collections.");
}
}
#endregion
#region Nested type: NodeOperation
internal enum NodeOperation
{
None = 0,
Add = 1,
Remove = 2,
Move = 3
}
#endregion
}
}
namespace System.Collections.Concurrent
{
using Diagnostics.Contracts;
using Runtime.InteropServices;
using Threading;
/// <summary>
/// Represents a node in a <see cref = "ConcurrentLinkedList{T}" />. This class cannot be inherited.
/// </summary>
/// <typeparam name = "T">
/// Specifies the element type of the linked list.
/// </typeparam>
/// <filterpriority>1</filterpriority>
[ComVisible(false)]
public sealed class ConcurrentLinkedListNode<T>
{
#region Fields
/// <summary>
/// <see cref = "ReaderWriterLockSlim" /> for <see cref = "NextNode" />, <see cref = "PreviousNode" /> and <see cref = "Owner" />
/// </summary>
internal readonly ReaderWriterLockSlim PositionLock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
/// <summary>
/// <see cref = "ReaderWriterLockSlim" /> for <see cref = "_value" />
/// </summary>
internal readonly ReaderWriterLockSlim ValueLock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
internal volatile int CurrentOperation;
internal volatile ConcurrentLinkedListNode<T> NextNode;
internal ConcurrentLinkedList<T> Owner;
internal volatile ConcurrentLinkedListNode<T> PreviousNode;
private T _value;
#endregion
#region Constructors
/// <summary>
/// Initializes a new instance of the <see cref = "ConcurrentLinkedListNode{T}" /> class, containing the specified value.
/// </summary>
/// <param name = "value">
/// The value to contain in the <see cref = "ConcurrentLinkedListNode{T}" />.
/// </param>
public ConcurrentLinkedListNode(T value)
{
_value = value;
}
#endregion
#region Properties
/// <summary>
/// Gets the <see cref = "ConcurrentLinkedList{T}" /> that the <see cref = "ConcurrentLinkedListNode{T}" /> belongs to.
/// </summary>
/// <returns>
/// A reference to the <see cref = "ConcurrentLinkedList{T}" /> that the <see cref = "ConcurrentLinkedListNode{T}" /> belongs to, or null if the
/// <see cref = "ConcurrentLinkedListNode{T}" /> is not linked.
/// </returns>
public ConcurrentLinkedList<T> List
{
get
{
var lockTaken = false;
try
{
PositionLock.EnsureLockedForReading(ref lockTaken);
return Owner;
}
finally
{
if (lockTaken)
{
PositionLock.UnlockAfterReading();
}
}
}
}
/// <summary>
/// Gets the next node in the <see cref = "ConcurrentLinkedList{T}" />.
/// </summary>
/// <returns>
/// A reference to the next node in the <see cref = "ConcurrentLinkedList{T}" />, or null if the current node is the last element
/// (<see cref = "ConcurrentLinkedList{T}.Last" />) of the <see cref = "ConcurrentLinkedList{T}" />.
/// </returns>
public ConcurrentLinkedListNode<T> Next
{
get
{
var lockTaken = false;
try
{
PositionLock.EnsureLockedForReading(ref lockTaken);
return NextNode != null && NextNode != Owner.First ? NextNode : null;
}
finally
{
if (lockTaken)
{
PositionLock.UnlockAfterReading();
}
}
}
internal set
{
var lockTaken = false;
try
{
PositionLock.EnsureLockedForWriting(ref lockTaken);
NextNode = value;
}
finally
{
if (lockTaken)
{
PositionLock.UnlockAfterWriting();
}
}
}
}
/// <summary>
/// Gets the previous node in the <see cref = "ConcurrentLinkedList{T}" />.
/// </summary>
/// <returns>
/// A reference to the previous node in the <see cref = "ConcurrentLinkedList{T}" />, or null if the current node is the first element
/// (<see cref = "ConcurrentLinkedList{T}.First" />) of the <see cref = "ConcurrentLinkedList{T}" />.
/// </returns>
public ConcurrentLinkedListNode<T> Previous
{
get
{
var lockTaken = false;
try
{
PositionLock.EnsureLockedForReading(ref lockTaken);
return PreviousNode != null && Owner.First != this ? PreviousNode : null;
}
finally
{
if (lockTaken)
{
PositionLock.UnlockAfterReading();
}
}
}
internal set
{
var lockTaken = false;
try
{
PositionLock.EnsureLockedForWriting(ref lockTaken);
PreviousNode = value;
}
finally
{
if (lockTaken)
{
PositionLock.UnlockAfterWriting();
}
}
}
}
/// <summary>
/// Gets the value contained in the node.
/// </summary>
/// <returns>
/// The value contained in the node.
/// </returns>
public T Value
{
get
{
var lockTaken = false;
try
{
ValueLock.EnsureLockedForReading(ref lockTaken);
return _value;
}
finally
{
if (lockTaken)
{
ValueLock.UnlockAfterReading();
}
}
}
set
{
var lockTaken = false;
try
{
ValueLock.EnsureLockedForWriting(ref lockTaken);
_value = value;
}
finally
{
if (lockTaken)
{
ValueLock.UnlockAfterWriting();
}
}
}
}
#endregion
#region Methods
internal void Invalidate()
{
var lockTaken = false;
try
{
PositionLock.EnsureLockedForWriting(ref lockTaken);
Owner = null;
NextNode = null;
PreviousNode = null;
}
finally
{
if(lockTaken)
PositionLock.UnlockAfterWriting();
}
}
#endregion
}
}
namespace System.Collections.Concurrent
{
using Diagnostics;
/// <summary>
/// Debug view for the IProducerConsumerCollection.
/// </summary>
/// <typeparam name = "T">Specifies the type of the data being aggregated.</typeparam>
internal sealed class IProducerConsumerCollection_DebugView<T>
{
#region Fields
private readonly IProducerConsumerCollection<T> _collection;
#endregion
#region Constructors
public IProducerConsumerCollection_DebugView(IProducerConsumerCollection<T> collection)
{
_collection = collection;
}
#endregion
#region Properties
[DebuggerBrowsable(DebuggerBrowsableState.RootHidden)]
public T[] Values
{
get { return _collection.ToArray(); }
}
#endregion
}
}
namespace System.Threading
{
using Diagnostics.Contracts;
/// <summary>
/// Contains extension methods targeting <see cref = "ReaderWriterLockSlim" /> and <see cref = "ReaderWriterLock" />.
/// </summary>
public static class ReaderWriterLockExtensions
{
#region Methods
/// <summary>
/// Downgrades an upgradeable-read lock obtained on a <see cref = "ReaderWriterLockSlim" /> to a read lock.
/// Be sure to call <see cref = "ReaderWriterLockSlim.ExitReadLock" /> when done reading.
/// </summary>
/// <param name = "rwl">The <see cref = "ReaderWriterLockSlim" />.</param>
/// <seealso cref = "ReaderWriterLockSlim.EnterUpgradeableReadLock" />
/// <example>
/// <code>
/// <![CDATA[
/// ReaderWriterLockSlim rwl = …;
/// …
/// bool upgraded = true;
/// rwl.EnterUpgradeableReadLock();
/// try {
/// if (… read some state to decide whether to upgrade …) {
/// var writeLocked = false;
/// try {
/// rwl.UpgradeToWriteLock(ref writeLocked);
/// … write to state …
/// } finally {
/// if(writeLocked)
/// rwl.ExitWriteLock();
/// }
/// } else {
/// rwl.DowngradeToReadLock();
/// upgraded = false;
/// try {
/// … read from state …
/// } finally {
/// rwl.ExitReadLock();
/// }
/// }
/// } finally {
/// if (upgraded)
/// rwl.ExitUpgradeableReadLock();
/// }
/// }
/// ]]>
/// </code>
/// </example>
public static void DowngradeToReadLock(this ReaderWriterLockSlim rwl)
{
Contract.Requires<ArgumentNullException>(rwl != null, "rwl");
Contract.Requires<InvalidOperationException>(rwl.IsUpgradeableReadLockHeld,
"The ReaderWriterLockSlim instance must be in UpgradeableRead mode.");
rwl.EnterReadLock();
rwl.ExitUpgradeableReadLock();
}
/// <summary>
/// Ensures that a read (or upgradeable read) lock is held on the provided <see cref = "ReaderWriterLockSlim" />.
/// </summary>
/// <param name = "rwl">The <see cref = "ReaderWriterLockSlim" /> to lock.</param>
/// <param name = "lockTaken">Indicates if the lock was successfully obtained.</param>
/// <param name = "upgradeable">Determines if an upgradeable read lock should be obtained.</param>
/// <param name = "millisecondsTimeOut">The number of milliseconds to wait to acquire the lock, or -1 (<see cref = "Timeout.Infinite" />) to wait indefinitely.</param>
public static void EnsureLockedForReading(this ReaderWriterLockSlim rwl,
ref bool lockTaken,
bool upgradeable = false,
int millisecondsTimeOut = Timeout.Infinite)
{
Contract.Requires<ArgumentNullException>(rwl != null, "lock");
Contract.Requires<ArgumentException>(lockTaken == false);
Contract.Requires<ArgumentOutOfRangeException>(
millisecondsTimeOut >= 0 || millisecondsTimeOut == Timeout.Infinite,
"The value of 'millisecondsTimeout' is negative, but is not equal to Timeout.Infinite (-1), which is the only negative value allowed.");
Contract.Requires<InvalidOperationException>(
rwl.RecursionPolicy == LockRecursionPolicy.NoRecursion || !rwl.IsWriteLockHeld,
@"The current thread initially entered the lock in write mode, and therefore cannot enter read or upgradeable mode since recursion is disabled.");
Contract.Requires<InvalidOperationException>(
!upgradeable || rwl.IsUpgradeableReadLockHeld || !rwl.IsReadLockHeld,
"The current thread initially entered the lock in read mode, and therefore trying to enter upgradeable mode would create the possibility of a deadlock.");
Contract.Ensures(upgradeable && rwl.IsUpgradeableReadLockHeld || !upgradeable && rwl.IsReadLockHeld);
var supportsRecursion = rwl.RecursionPolicy == LockRecursionPolicy.SupportsRecursion;
if (upgradeable)
{
if (!rwl.IsUpgradeableReadLockHeld || supportsRecursion)
lockTaken = rwl.TryEnterUpgradeableReadLock(millisecondsTimeOut);
}
else
{
if (!rwl.IsReadLockHeld || supportsRecursion)
lockTaken = rwl.TryEnterReadLock(millisecondsTimeOut);
}
}
/// <summary>
/// Ensures that a writer lock is held on the provided <see cref = "ReaderWriterLockSlim" />.
/// </summary>
/// <param name = "rwl">The <see cref = "ReaderWriterLockSlim" /> to lock.</param>
/// <param name = "lockTaken">Indicates if the lock was successfully obtained.</param>
/// <param name = "millisecondsTimeOut">The number of milliseconds to wait to acquire the lock, or -1 (<see cref = "Timeout.Infinite" />) to wait indefinitely.</param>
public static void EnsureLockedForWriting(this ReaderWriterLockSlim rwl,
ref bool lockTaken,
int millisecondsTimeOut = Timeout.Infinite)
{
Contract.Requires<ArgumentNullException>(rwl != null, "rwl");
Contract.Requires<ArgumentException>(lockTaken == false);
Contract.Requires<ArgumentOutOfRangeException>(
millisecondsTimeOut >= 0 || millisecondsTimeOut == Timeout.Infinite,
"The value of 'millisecondsTimeout' is negative, but is not equal to Timeout.Infinite (-1), which is the only negative value allowed.");
Contract.Requires<InvalidOperationException>(!rwl.IsReadLockHeld,
"The current thread initially entered the lock in read mode, and therefore trying to enter write mode would create the possibility of a deadlock.");
if (!rwl.IsWriteLockHeld || rwl.RecursionPolicy == LockRecursionPolicy.SupportsRecursion)
{
lockTaken = rwl.TryEnterWriteLock(millisecondsTimeOut);
}
}
/// <summary>
/// Release the current read (or upgradeable read) lock (if any) on the provided <see cref = "ReaderWriterLockSlim" />.
/// </summary>
/// <param name = "rwl">The <see cref = "ReaderWriterLockSlim" /> to release.</param>
public static void UnlockAfterReading(this ReaderWriterLockSlim rwl)
{
Contract.Requires<ArgumentNullException>(rwl != null, "lock");
Contract.Ensures(!rwl.IsReadLockHeld && !rwl.IsUpgradeableReadLockHeld);
if (rwl.IsUpgradeableReadLockHeld)
rwl.ExitUpgradeableReadLock();
if (rwl.IsReadLockHeld)
rwl.ExitReadLock();
}
/// <summary>
/// Release the current write lock (if any) on the provided <see cref = "ReaderWriterLockSlim" />.
/// </summary>
/// <param name = "rwl">The <see cref = "ReaderWriterLockSlim" /> to unlock.</param>
public static void UnlockAfterWriting(this ReaderWriterLockSlim rwl)
{
Contract.Requires<ArgumentNullException>(rwl != null, "lock");
Contract.Ensures(!rwl.IsWriteLockHeld);
if (rwl.IsWriteLockHeld)
{
rwl.ExitWriteLock();
}
}
/// <summary>
/// Upgrades an upgradeable-read lock obtained on a <see cref = "ReaderWriterLockSlim" /> to a write lock.
/// Be sure to call <see cref = "ReaderWriterLockSlim.ExitWriteLock" /> when done writing.
/// </summary>
/// <param name = "rwl">The <see cref = "ReaderWriterLockSlim" />.</param>
/// <param name = "lockTaken"></param>
/// <seealso cref = "ReaderWriterLockSlim.EnterUpgradeableReadLock" />
/// <example>
/// <code>
/// <![CDATA[
/// ReaderWriterLockSlim rwl = …;
/// …
/// bool upgraded = true;
/// rwl.EnterUpgradeableReadLock();
/// try {
/// if (… read some state to decide whether to upgrade …) {
/// var writeLocked = false;
/// try {
/// rwl.UpgradeToWriteLock(ref writeLocked);
/// … write to state …
/// } finally {
/// if(writeLocked)
/// rwl.ExitWriteLock();
/// }
/// } else {
/// rwl.DowngradeToReadLock();
/// upgraded = false;
/// try {
/// … read from state …
/// } finally {
/// rwl.ExitReadLock();
/// }
/// }
/// } finally {
/// if (upgraded)
/// rwl.ExitUpgradeableReadLock();
/// }
/// }
/// ]]>
/// </code>
/// </example>
public static void UpgradeToWriteLock(this ReaderWriterLockSlim rwl, ref bool lockTaken)
{
Contract.Requires<ArgumentNullException>(rwl != null, "rwl");
Contract.Requires<InvalidOperationException>(rwl.IsUpgradeableReadLockHeld,
"The ReaderWriterLockSlim instance must be in UpgradeableRead mode.");
rwl.EnsureLockedForWriting(ref lockTaken);
}
#endregion
}
}
@jimitndiaye

This comment has been minimized.

Copy link
Owner Author

@jimitndiaye jimitndiaye commented Oct 31, 2011

Couldn't find any implementations of a ConcurrentLinkedList for .NET (apparently it was in the betas but got dropped from .Net4 RTM). This is a preliminary implementation no doubt full of potential race conditions or performance issues, so use at your own peril. I'd appreciate any feedback. Be gentle though - it's my first time ;-)

@oberxon

This comment has been minimized.

Copy link

@oberxon oberxon commented Jun 28, 2016

good work!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment