Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
ConcurrentLinkedList<T>
#pragma warning disable 420
namespace System.Collections.Concurrent
{
using Azure.Framework;
using Diagnostics;
using Diagnostics.Contracts;
using Generic;
using Runtime.InteropServices;
using Runtime.Serialization;
using Security.Permissions;
using Threading;
/// <summary>
/// A thread-safe doubly-linked list
/// </summary>
/// <typeparam name = "T"></typeparam>
[Serializable, DebuggerTypeProxy(typeof (IProducerConsumerCollection_DebugView<>)),
DebuggerDisplay("Count = {Count}"), ComVisible(false),
HostProtection(SecurityAction.LinkDemand, Synchronization = true, ExternalThreading = true)]
public class ConcurrentLinkedList<T> : IProducerConsumerCollection<T>, ICollection<T>, IEnumerable<T>, ICollection,
IEnumerable
{
#region Fields
[NonSerialized]
private ConcurrentLinkedListNode<T> _head;
/// <summary>
/// <see cref = "ReaderWriterLockSlim" /> for <see cref = "_head" />
/// </summary>
private ReaderWriterLockSlim _headLock;
[NonSerialized]
private volatile int _lastCount;
private T[] _serializationArray;
[NonSerialized]
private volatile int _version;
[NonSerialized]
private volatile int _versionAtLastCount;
#endregion
#region Constructors
/// <summary>
/// Initializes a new instance of the <see cref = "ConcurrentLinkedList&lt;T&gt;" /> class.
/// </summary>
public ConcurrentLinkedList()
{
Initialize(null);
}
/// <summary>
/// Initializes a new instance of the <see cref = "ConcurrentLinkedList&lt;T&gt;" /> class.
/// </summary>
/// <param name = "collection">The initial items with which to populate the list.</param>
public ConcurrentLinkedList(IEnumerable<T> collection)
{
Contract.Requires<ArgumentNullException>(collection != null, "collection");
Initialize(collection);
}
#endregion
#region Properties
/// <summary>
/// Gets a value that indicates whether the <see cref = "ConcurrentLinkedList{T}" /> is empty.
/// </summary>
/// <returns><see langword = "true" /> if the <see cref = "ConcurrentLinkedList{T}" /> is empty; otherwise <see langword = "false" />.</returns>
public bool IsEmpty
{
get { return First == null; }
}
/// <summary>
/// Gets the first node of the <see cref = "ConcurrentLinkedList{T}" />.
/// </summary>
/// <returns>The first <see cref = "ConcurrentLinkedListNode{T}" /> of the <see cref = "ConcurrentLinkedList{T}" />.</returns>
public ConcurrentLinkedListNode<T> First
{
get
{
var lockTaken = false;
try
{
_headLock.EnsureLockedForReading(ref lockTaken);
return _head;
}
finally
{
if (lockTaken)
{
_headLock.UnlockAfterReading();
}
}
}
}
/// <summary>
/// Gets the last node of the <see cref = "ConcurrentLinkedList{T}" />.
/// </summary>
/// <returns>The last <see cref = "ConcurrentLinkedListNode{T}" /> of the <see cref = "ConcurrentLinkedList{T}" />.</returns>
public ConcurrentLinkedListNode<T> Last
{
get
{
var headLocked = false;
try
{
_headLock.EnsureLockedForReading(ref headLocked);
var head = _head;
if (head != null)
{
var positionLocked = false;
try
{
head.PositionLock.EnsureLockedForReading(ref positionLocked);
return head.PreviousNode;
}
finally
{
if (positionLocked)
{
head.PositionLock.UnlockAfterReading();
}
}
}
return null;
}
finally
{
if (headLocked)
_headLock.UnlockAfterReading();
}
}
}
#endregion
#region Methods
private void AcquireAllLocks(bool upgradeable = false, bool lockValues = false)
{
Contract.Assert(_headLock != null && _headLock.IsReadLockHeld,
"A reader-lock must be obtained before reading the head");
if (_head == null) return;
var node = _head;
var lockTaken = false;
do
{
node.PositionLock.EnsureLockedForReading(ref lockTaken, upgradeable);
if (lockValues)
{
lockTaken = false;
node.ValueLock.EnsureLockedForReading(ref lockTaken);
// Not upgradeable since we shouldn't change values.
}
node = node.NextNode;
} while (node != _head);
}
/// <summary>
/// Adds a new node containing the specified value after the specified existing node in the <see cref = "ConcurrentLinkedList{T}" />.
/// </summary>
/// <returns>The new <see cref = "ConcurrentLinkedListNode{T}" /> containing <paramref name = "value" />.</returns>
/// <param name = "node">The <see cref = "ConcurrentLinkedListNode{T}" /> after which to insert a new <see cref = "ConcurrentLinkedListNode{T}" /> containing <paramref name = "value" />.</param>
/// <param name = "value">The value to add to the <see cref = "ConcurrentLinkedList{T}" />.</param>
/// <exception cref = "T:System.ArgumentNullException">
/// <paramref name = "node" /> is null.</exception>
/// <exception cref = "T:System.InvalidOperationException">
/// <paramref name = "node" /> is not in the current <see cref = "ConcurrentLinkedList{T}" />.</exception>
public ConcurrentLinkedListNode<T> AddAfter(ConcurrentLinkedListNode<T> node, T value)
{
Contract.Requires<ArgumentNullException>(node != null, "node");
var newNode = new ConcurrentLinkedListNode<T>(value);
AddAfter(node, newNode);
return newNode;
}
/// <summary>
/// Adds the specified new node after the specified existing node in the <see cref = "T:System.Collections.Generic.ConcurrentLinkedList`1" />.
/// </summary>
/// <param name = "node">The <see cref = "T:System.Collections.Generic.ConcurrentLinkedListNode`1" /> after which to insert <paramref name = "newNode" />.</param>
/// <param name = "newNode">The new <see cref = "T:System.Collections.Generic.ConcurrentLinkedListNode`1" /> to add to the <see cref = "T:System.Collections.Generic.ConcurrentLinkedList`1" />.</param>
/// <exception cref = "T:System.ArgumentNullException">
/// <paramref name = "node" /> is null.-or-<paramref name = "newNode" /> is null.</exception>
/// <exception cref = "T:System.InvalidOperationException">
/// <paramref name = "node" /> is not in the current <see cref = "T:System.Collections.Generic.ConcurrentLinkedList`1" />.-or-<paramref name = "newNode" /> belongs to another <see cref = "T:System.Collections.Generic.ConcurrentLinkedList`1" />.</exception>
public void AddAfter(ConcurrentLinkedListNode<T> node, ConcurrentLinkedListNode<T> newNode)
{
Contract.Requires<ArgumentNullException>(node != null, "node");
Contract.Requires<ArgumentNullException>(newNode != null, "newNode");
var positionLocked = false;
try
{
node.PositionLock.EnsureLockedForReading(ref positionLocked, upgradeable: true);
InsertNodeBefore(node.NextNode, newNode);
}
finally
{
if (positionLocked)
node.PositionLock.UnlockAfterReading();
}
}
/// <summary>
/// Adds a new node containing the specified value before the specified existing node in the <see cref = "ConcurrentLinkedList{T}" />.
/// </summary>
/// <param name = "node">The <see cref = "ConcurrentLinkedListNode{T}" /> before which to insert a new <see cref = "ConcurrentLinkedListNode{T}" /> containing <paramref name = "value" />.</param>
/// <param name = "value">The value to add to the <see cref = "ConcurrentLinkedList{T}" />.</param>
/// <returns>
/// The new <see cref = "ConcurrentLinkedListNode{T}" /> containing <paramref name = "value" />.
/// </returns>
/// <exception cref = "ArgumentNullException">
/// <paramref name = "node" /> is null.</exception>
/// <exception cref = "InvalidOperationException">
/// <paramref name = "node" /> is not in the current <see cref = "ConcurrentLinkedList{T}" />.
/// </exception>
public ConcurrentLinkedListNode<T> AddBefore(ConcurrentLinkedListNode<T> node, T value)
{
Contract.Requires<ArgumentNullException>(node != null, "node");
var newNode = new ConcurrentLinkedListNode<T>(value);
InsertNodeBefore(node, newNode);
return newNode;
}
/// <summary>
/// Adds the specified new node before the specified existing node in the <see cref = "ConcurrentLinkedList{T}" />.
/// </summary>
/// <param name = "node">The <see cref = "ConcurrentLinkedListNode{T}" /> before which to insert <paramref name = "newNode" />.</param>
/// <param name = "newNode">The new <see cref = "ConcurrentLinkedListNode{T}" /> to add to the <see cref = "ConcurrentLinkedList{T}" />.</param>
/// <exception cref = "InvalidOperationException">
/// <paramref name = "node" /> is not in the current <see cref = "ConcurrentLinkedList{T}" />.-or-<paramref name = "newNode" /> belongs to another <see cref = "ConcurrentLinkedList{T}" />.
/// </exception>
public void AddBefore(ConcurrentLinkedListNode<T> node, ConcurrentLinkedListNode<T> newNode)
{
Contract.Requires<ArgumentNullException>(node != null, "node");
Contract.Requires<ArgumentNullException>(newNode != null, "newNode");
InsertNodeBefore(node, newNode);
}
/// <summary>
/// Adds a new node containing the specified value at the start of the <see cref = "ConcurrentLinkedList{T}" />.
/// </summary>
/// <returns>The new <see cref = "ConcurrentLinkedListNode{T}" /> containing <paramref name = "value" />.</returns>
/// <param name = "value">The value to add at the start of the <see cref = "ConcurrentLinkedList{T}" />.</param>
public ConcurrentLinkedListNode<T> AddFirst(T value)
{
var newNode = new ConcurrentLinkedListNode<T>(value);
AddFirst(newNode);
return newNode;
}
/// <summary>
/// Adds the specified new node at the start of the <see cref = "ConcurrentLinkedList{T}" />.
/// </summary>
/// <param name = "node">The new <see cref = "ConcurrentLinkedListNode{T}" /> to add at the start of the <see cref = "ConcurrentLinkedList{T}" />.</param>
/// <exception cref = "T:System.ArgumentNullException">
/// <paramref name = "node" /> is null.</exception>
/// <exception cref = "T:System.InvalidOperationException">
/// <paramref name = "node" /> belongs to another <see cref = "ConcurrentLinkedList{T}" />.</exception>
public void AddFirst(ConcurrentLinkedListNode<T> node)
{
Contract.Requires<ArgumentNullException>(node != null, "node");
Contract.Requires<InvalidOperationException>(node.List == null,
"'node' already belongs to another ConcurrentLinkedList.");
Contract.Ensures(First == node);
var headLocked = false;
try
{
_headLock.EnsureLockedForReading(ref headLocked, true);
if (_head == null)
InsertNodeToEmptyList(node);
else
InsertNodeBefore(_head, node);
}
finally
{
if (headLocked)
{
_headLock.UnlockAfterReading();
}
}
}
/// <summary>
/// Adds a new node containing the specified value at the end of the <see cref = "ConcurrentLinkedList{T}" />.
/// </summary>
/// <returns>
/// The new <see cref = "ConcurrentLinkedListNode{T}" /> containing <paramref name = "value" />.
/// </returns>
/// <param name = "value">
/// The value to add at the end of the <see cref = "ConcurrentLinkedList{T}" />.
/// </param>
public ConcurrentLinkedListNode<T> AddLast(T value)
{
var newNode = new ConcurrentLinkedListNode<T>(value);
AddLast(newNode);
return newNode;
}
/// <summary>
/// Adds the specified new node at the end of the <see cref = "ConcurrentLinkedList{T}" />.
/// </summary>
/// <param name = "node">The new <see cref = "ConcurrentLinkedListNode{T}" /> to add at the end of the <see cref = "ConcurrentLinkedList{T}" />.</param>
/// <exception cref = "T:System.ArgumentNullException">
/// <paramref name = "node" /> is null.</exception>
/// <exception cref = "T:System.InvalidOperationException">
/// <paramref name = "node" /> belongs to another <see cref = "ConcurrentLinkedList{T}" />.</exception>
public void AddLast(ConcurrentLinkedListNode<T> node)
{
Contract.Requires<ArgumentNullException>(node != null, "node");
Contract.Requires<InvalidOperationException>(node.List == null,
"'node' already belongs to another ConcurrentLinkedList.");
Contract.Ensures(Last == node);
var headLocked = false;
try
{
_headLock.EnsureLockedForReading(ref headLocked, true);
if (_head == null)
InsertNodeToEmptyList(node);
else
InsertNodeBefore(_head, node, replaceHead: false);
}
finally
{
if (headLocked)
{
_headLock.UnlockAfterReading();
}
}
}
/// <summary>
/// Finds the first node that contains the specified value.
/// </summary>
/// <returns>The first <see cref = "ConcurrentLinkedListNode{T}" /> that contains the specified value, if found; otherwise, null.</returns>
/// <param name = "value">The value to locate in the <see cref = "ConcurrentLinkedList{T}" />.</param>
public ConcurrentLinkedListNode<T> Find(T value)
{
var lockTaken = false;
try
{
FreezeList(ref lockTaken, lockValues: true);
return FindNodeWithValue(value);
}
finally
{
UnfreezeList(lockTaken);
}
}
private ConcurrentLinkedListNode<T> FindNodeWithValue(T value, bool searchFromFirstToLast = true)
{
Contract.Assert(_headLock.IsReadLockHeld);
var head = _head;
if (head != null)
{
Contract.Assert(head.PositionLock.IsReadLockHeld);
var startNode = searchFromFirstToLast ? head : head.PreviousNode;
Func<ConcurrentLinkedListNode<T>, ConcurrentLinkedListNode<T>> getNext =
n => searchFromFirstToLast ? n.NextNode : n.PreviousNode;
var equalityComparer = EqualityComparer<T>.Default;
var node = startNode;
do
{
if (equalityComparer.Equals(node.Value, value))
return node;
if (node != head)
Contract.Assert(node.PositionLock.IsReadLockHeld);
node = getNext(node);
} while (node != startNode);
}
return null;
}
/// <summary>
/// Finds the last node that contains the specified value.
/// </summary>
/// <returns>The last <see cref = "ConcurrentLinkedListNode{T}" /> that contains the specified value, if found; otherwise, null.</returns>
/// <param name = "value">The value to locate in the <see cref = "ConcurrentLinkedList{T}" />.</param>
public ConcurrentLinkedListNode<T> FindLast(T value)
{
var lockTaken = false;
try
{
FreezeList(ref lockTaken, lockValues: true);
return FindNodeWithValue(value, searchFromFirstToLast: false);
}
finally
{
UnfreezeList(lockTaken);
}
}
private void FreezeList(ref bool lockTaken, bool upgradeable = false, bool lockValues = false)
{
Contract.Ensures(_headLock.IsReadLockHeld);
_headLock.EnsureLockedForReading(ref lockTaken, upgradeable);
AcquireAllLocks(upgradeable, lockValues);
WaitAllOperations();
}
private void Initialize(IEnumerable<T> collection)
{
Contract.Ensures(_headLock != null);
new SpinLock(true);
Contract.Assert(_headLock == null);
_headLock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
if (collection != null)
{
foreach (var item in collection)
AddLast(item);
}
}
internal void InsertNodeBefore(ConcurrentLinkedListNode<T> node,
ConcurrentLinkedListNode<T> newNode,
bool replaceHead = true)
{
Contract.Requires<ArgumentNullException>(node != null, "node");
Contract.Requires<ArgumentNullException>(newNode != null, "newNode");
Contract.Ensures(newNode.NextNode == node);
Contract.Ensures(newNode.PreviousNode == Contract.OldValue(node.PreviousNode));
Contract.Ensures(node.PreviousNode == newNode);
Contract.Ensures(Contract.OldValue(node.PreviousNode.NextNode) == newNode);
UpdateNode(node, NodeOperation.Move,
succeedingNode =>
{
var precedingNode = succeedingNode.PreviousNode;
// Set the precedingNode to point to newNode as next;
UpdateNode(precedingNode, NodeOperation.Move, x => x.Next = newNode);
UpdateNode(newNode, NodeOperation.Add, x =>
{
var lockTaken = false;
try
{
x.PositionLock.EnsureLockedForWriting(
ref lockTaken);
// Set the newNode to point to succeedingNode and precedingNode as next and previous respectively.
x.NextNode = succeedingNode;
x.PreviousNode = precedingNode;
}
finally
{
if (lockTaken)
{
x.PositionLock.UnlockAfterWriting();
}
}
});
// Finally set succeedingNode to point to newNode as previous
succeedingNode.Previous = newNode;
// If succeedingNode is the current head, replace it with newNode.
if (replaceHead)
SetHead(head => head == succeedingNode, newNode);
});
}
internal void InsertNodeToEmptyList(ConcurrentLinkedListNode<T> newNode)
{
Contract.Requires<ArgumentNullException>(newNode != null, "newNode");
UpdateNode(newNode, NodeOperation.Add,
node => SetHead(head => head == null, node,
onSuccess: () =>
{
var nodeLocked = false;
try
{
node.PositionLock.EnsureLockedForWriting(
ref nodeLocked);
// Set the node to point to itself as previous and next since it is the only node in the list.
node.PreviousNode = node.NextNode = node;
}
finally
{
if (nodeLocked)
{
node.PositionLock.UnlockAfterWriting();
}
}
},
onFailure: () =>
{
#if DEBUG
// This shouldn't really occur since we only call this method when the head is locked and null.
if (Debugger.IsAttached)
Debugger.Break();
#endif
// We've been beaten to the chase. Insert it before the current head.
Contract.Assert(_head != null &&
_head.PositionLock.IsUpgradeableReadLockHeld);
InsertNodeBefore(_head, node, replaceHead: false);
//NOTE: Should we replace the head here?
}));
}
[ContractInvariantMethod]
private void ObjectInvariant()
{
Contract.Invariant(_headLock != null);
}
[OnDeserialized]
private void OnDeserialized(StreamingContext context)
{
Contract.Ensures(_headLock != null);
Contract.Ensures(_serializationArray == null);
Initialize(_serializationArray);
_serializationArray = null;
}
[OnSerializing]
private void OnSerializing(StreamingContext context)
{
Contract.Ensures(_serializationArray != null);
_serializationArray = ToArray();
}
private void ReleaseAllLocks()
{
Contract.Assert(_headLock != null && _headLock.IsReadLockHeld);
if (_head == null) return;
var next = _head;
do
{
var current = next;
Contract.Assert(current.PositionLock.IsReadLockHeld || current.PositionLock.IsWriteLockHeld);
next = current.NextNode;
current.PositionLock.UnlockAfterReading();
current.ValueLock.UnlockAfterReading();
} while (next != _head);
}
/// <summary>
/// Removes the specified node from the <see cref = "ConcurrentLinkedList{T}" />.
/// </summary>
/// <param name = "node">The <see cref = "ConcurrentLinkedListNode{T}" /> to remove from the <see cref = "ConcurrentLinkedList{T}" />.</param>
/// <exception cref = "T:System.ArgumentNullException">
/// <paramref name = "node" /> is null.</exception>
/// <exception cref = "T:System.InvalidOperationException">
/// <paramref name = "node" /> is not in the current <see cref = "ConcurrentLinkedList{T}" />.</exception>
public void Remove(ConcurrentLinkedListNode<T> node)
{
Contract.Requires<ArgumentNullException>(node != null, "node");
RemoveNode(node);
}
/// <summary>
/// Removes the node at the start of the <see cref = "ConcurrentLinkedList{T}" />.
/// </summary>
/// <exception cref = "T:System.InvalidOperationException">The <see cref = "ConcurrentLinkedList{T}" /> is empty.</exception>
public void RemoveFirst()
{
Contract.Requires<InvalidOperationException>(First != null, "The ConcurrentLinkedList is empty.");
var headLocked = false;
try
{
_headLock.EnsureLockedForReading(ref headLocked, true);
var head = _head;
if (head != null)
RemoveNode(head);
}
finally
{
if (headLocked)
_headLock.UnlockAfterReading();
}
}
/// <summary>
/// Removes the node at the end of the <see cref = "ConcurrentLinkedList{T}" />.
/// </summary>
/// <exception cref = "T:System.InvalidOperationException">The <see cref = "ConcurrentLinkedList{T}" /> is empty.</exception>
public void RemoveLast()
{
Contract.Requires<InvalidOperationException>(Last != null, "The ConcurrentLinkedList is empty.");
var headLocked = false;
try
{
_headLock.EnsureLockedForReading(ref headLocked, true);
var head = _head;
if (head != null)
{
var positionLocked = false;
try
{
head.PositionLock.EnsureLockedForReading(ref positionLocked, upgradeable:true);
RemoveNode(head.PreviousNode);
}
finally
{
if(positionLocked)
head.PositionLock.UnlockAfterReading();
}
}
}
finally
{
if (headLocked)
_headLock.UnlockAfterReading();
}
}
private void RemoveNode(ConcurrentLinkedListNode<T> node)
{
if (node == null) return;
UpdateNode(node, NodeOperation.Remove,
target =>
{
if (target.Owner == null)
return;
var precedingNode = target.PreviousNode;
var succeedingNode = target.NextNode;
target.Invalidate();
if (succeedingNode == target || precedingNode == target)
{
// This is only true if node is the head and is the only node in the list.
// So removing it effectively sets _head to null.
SetHead(head => head == target, null);
}
else
{
// Update the preceding and succeeding nodes to point to each other as next and previous respectively.
UpdateNode(precedingNode, NodeOperation.Move, n => n.Next = succeedingNode);
UpdateNode(succeedingNode, NodeOperation.Move, n => n.Previous = precedingNode);
// If 'target' is the current head, switch the head to the next node
SetHead(head => head == target, succeedingNode);
}
});
}
private void SetHead(Func<ConcurrentLinkedListNode<T>, bool> condition,
ConcurrentLinkedListNode<T> newHead,
Action onSuccess = null,
Action onFailure = null)
{
var readLocked = false;
try
{
_headLock.EnsureLockedForReading(ref readLocked, true);
if (condition(_head))
{
var writeLocked = false;
try
{
_headLock.UpgradeToWriteLock(ref writeLocked);
if (condition(_head))
{
_head = newHead;
if (onSuccess != null)
onSuccess();
}
else
{
if (onFailure != null)
onFailure();
}
}
finally
{
if (writeLocked)
_headLock.ExitWriteLock();
}
}
}
finally
{
if (readLocked)
{
_headLock.UnlockAfterReading();
}
}
}
private List<T> ToList()
{
var list = new List<T>();
var lockTaken = false;
try
{
FreezeList(ref lockTaken, lockValues:true);
if (_head != null)
{
var node = _head;
do
{
list.Add(node.Value);
node = node.NextNode;
} while (node != _head);
}
return list;
}
finally
{
UnfreezeList(lockTaken);
}
}
public bool TryPeek(out T result)
{
return TryTakeOrPeek(out result, false);
}
private bool TryTakeOrPeek(out T result, bool take)
{
result = default(T);
var headLocked = false;
try
{
_headLock.EnsureLockedForReading(ref headLocked, take);
var head = _head;
if (head != null)
{
var positionLocked = false;
try
{
head.PositionLock.EnsureLockedForReading(ref positionLocked, take);
var last = head.PreviousNode;
result = last.Value;
if (take)
RemoveNode(last);
return true;
}
finally
{
if(positionLocked)
head.PositionLock.UnlockAfterReading();
}
}
return false;
}
finally
{
if (headLocked)
{
_headLock.UnlockAfterReading();
}
}
}
private void UnfreezeList(bool lockTaken)
{
ReleaseAllLocks();
if (lockTaken)
_headLock.UnlockAfterReading();
}
private void UpdateNode(ConcurrentLinkedListNode<T> node,
NodeOperation operation,
Action<ConcurrentLinkedListNode<T>> action)
{
Contract.Requires(node != null);
Contract.Requires(action != null);
Contract.Requires(operation != NodeOperation.None);
var readLocked = false;
var writeLocked = false;
try
{
node.PositionLock.EnsureLockedForReading(ref readLocked, upgradeable: true);
var owner = node.Owner;
switch (operation)
{
case NodeOperation.Remove:
case NodeOperation.Move:
Guard.Verify(owner == this,
"'node' does not belong to the current ConcurrentLinkedList.");
break;
case NodeOperation.Add:
Guard.Verify(owner == null || owner == this,
"'node' already belongs to another ConcurrentLinkedList instance.");
node.PositionLock.EnsureLockedForWriting(ref writeLocked);
// Set the owner.
node.Owner = this;
break;
}
if (node.CurrentOperation != (int) NodeOperation.None)
{
var spinWait = new SpinWait();
while (node.CurrentOperation != (int) NodeOperation.None)
{
spinWait.SpinOnce();
}
}
Interlocked.Exchange(ref node.CurrentOperation, (int) operation);
action(node);
if (operation == NodeOperation.Add || operation == NodeOperation.Remove)
Interlocked.Increment(ref _version);
}
finally
{
Interlocked.Exchange(ref node.CurrentOperation, (int) NodeOperation.None);
if (writeLocked)
node.PositionLock.UnlockAfterWriting();
if (readLocked)
node.PositionLock.UnlockAfterReading();
}
}
private void WaitAllOperations()
{
Contract.Requires(_headLock.IsReadLockHeld);
if (_head == null)
return;
// No need to lock the head since this method is only called from the FreezeList operation, meaning it's already locked.
var node = _head;
do
{
if (node.CurrentOperation != (int) NodeOperation.None)
{
var wait = new SpinWait();
while (node.CurrentOperation != (int) NodeOperation.None)
{
wait.SpinOnce();
}
}
node = node.Next;
} while (node != _head);
}
#endregion
#region ICollection<T> Members
/// <summary>
/// Removes all nodes from the <see cref = "ConcurretLinkedList{T}" />.
/// </summary>
public void Clear()
{
var lockTaken = false;
try
{
FreezeList(ref lockTaken, upgradeable:true);
var next = _head;
while (next != null)
{
var node = next;
next = next.NextNode;
node.Invalidate();
}
SetHead(head => head != null, null);
_lastCount = 0;
_version++;
}
finally
{
UnfreezeList(lockTaken);
}
}
/// <summary>
/// Removes the first occurrence of the specified value from the <see cref = "ConcurrentLinkedList{T}" />.
/// </summary>
/// <param name = "value">The value to remove from the <see cref = "ConcurrentLinkedList{T}" />.</param>
/// <returns>
/// <see langword = "true" /> if the element containing <paramref name = "value" /> is successfully removed; otherwise, <see langword = "false" />.
/// This method also returns false if <paramref name = "value" /> was not found in the original <see cref = "ConcurrentLinkedList{T}" />.
/// </returns>
public bool Remove(T value)
{
var lockTaken = false;
try
{
FreezeList(ref lockTaken, upgradeable:true, lockValues:true);
var node = FindNodeWithValue(value);
if (node != null)
{
RemoveNode(node);
return true;
}
return false;
}
finally
{
UnfreezeList(lockTaken);
}
}
/// <summary>
/// Adds an item to the <see cref = "T:System.Collections.Generic.ICollection`1" />.
/// </summary>
/// <param name = "item">The object to add to the <see cref = "T:System.Collections.Generic.ICollection`1" />.</param>
/// <exception cref = "T:System.NotSupportedException">The <see cref = "T:System.Collections.Generic.ICollection`1" /> is read-only.</exception>
void ICollection<T>.Add(T item)
{
AddLast(item);
}
/// <summary>
/// Determines whether the <see cref = "T:System.Collections.Generic.ICollection`1" /> contains a specific value.
/// </summary>
/// <returns>
/// true if <paramref name = "item" /> is found in the <see cref = "T:System.Collections.Generic.ICollection`1" />; otherwise, false.
/// </returns>
/// <param name = "item">The object to locate in the <see cref = "T:System.Collections.Generic.ICollection`1" />.</param>
public bool Contains(T item)
{
return Find(item) != null;
}
/// <summary>
/// Gets a value indicating whether the <see cref = "T:System.Collections.Generic.ICollection`1" /> is read-only.
/// </summary>
/// <returns>
/// true if the <see cref = "T:System.Collections.Generic.ICollection`1" /> is read-only; otherwise, false.
/// </returns>
bool ICollection<T>.IsReadOnly
{
get { return false; }
}
#endregion
#region IProducerConsumerCollection<T> Members
public void CopyTo(T[] array, int index)
{
Contract.Requires<ArgumentNullException>(array != null, "array");
Contract.Requires<ArgumentOutOfRangeException>(index >= 0, "index");
ToList().CopyTo(array, index);
}
public IEnumerator<T> GetEnumerator()
{
return ((IEnumerable<T>) ToArray()).GetEnumerator();
}
/// <summary>
/// Attempts to add an object to the <see cref = "T:System.Collections.Concurrent.IProducerConsumerCollection`1" />.
/// </summary>
/// <returns>
/// true if the object was added successfully; otherwise, false.
/// </returns>
/// <param name = "item">The object to add to the <see cref = "T:System.Collections.Concurrent.IProducerConsumerCollection`1" />.</param>
/// <exception cref = "T:System.ArgumentException">The <paramref name = "item" /> was invalid for this collection.</exception>
bool IProducerConsumerCollection<T>.TryAdd(T item)
{
AddLast(item);
return true;
}
void ICollection.CopyTo(Array array, int index)
{
Contract.Requires<ArgumentNullException>(array != null, "array");
Contract.Requires<ArgumentOutOfRangeException>(index >= 0, "index");
ICollection list = ToList();
list.CopyTo(array, index);
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
public T[] ToArray()
{
return ToList().ToArray();
}
/// <summary>
/// Attempts to remove and return an object from the <see cref = "T:System.Collections.Concurrent.IProducerConsumerCollection`1" />.
/// </summary>
/// <returns>
/// true if an object was removed and returned successfully; otherwise, false.
/// </returns>
/// <param name = "item">When this method returns, if the object was removed and returned successfully, <paramref name = "item" /> contains the removed object. If no object was available to be removed, the value is unspecified.</param>
public bool TryTake(out T result)
{
return TryTakeOrPeek(out result, true);
}
public int Count
{
get
{
var lockTaken = false;
try
{
FreezeList(ref lockTaken);
var currentCount = 0;
if (_version == _versionAtLastCount)
// Nothing has changed. Use the last count;
currentCount = _lastCount;
else
{
if (_head != null)
{
var node = _head;
var count = 0;
do
{
count++;
node = node.NextNode;
} while (node != _head);
currentCount = count;
}
_lastCount = currentCount;
_versionAtLastCount = _version;
}
return currentCount;
}
finally
{
UnfreezeList(lockTaken);
}
}
}
bool ICollection.IsSynchronized
{
get { return false; }
}
object ICollection.SyncRoot
{
get
{
throw new NotSupportedException(
"The SyncRoot property may not be used for the synchronization of concurrent collections.");
}
}
#endregion
#region Nested type: NodeOperation
internal enum NodeOperation
{
None = 0,
Add = 1,
Remove = 2,
Move = 3
}
#endregion
}
}
namespace System.Collections.Concurrent
{
using Diagnostics.Contracts;
using Runtime.InteropServices;
using Threading;
/// <summary>
/// Represents a node in a <see cref = "ConcurrentLinkedList{T}" />. This class cannot be inherited.
/// </summary>
/// <typeparam name = "T">
/// Specifies the element type of the linked list.
/// </typeparam>
/// <filterpriority>1</filterpriority>
[ComVisible(false)]
public sealed class ConcurrentLinkedListNode<T>
{
#region Fields
/// <summary>
/// <see cref = "ReaderWriterLockSlim" /> for <see cref = "NextNode" />, <see cref = "PreviousNode" /> and <see cref = "Owner" />
/// </summary>
internal readonly ReaderWriterLockSlim PositionLock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
/// <summary>
/// <see cref = "ReaderWriterLockSlim" /> for <see cref = "_value" />
/// </summary>
internal readonly ReaderWriterLockSlim ValueLock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
internal volatile int CurrentOperation;
internal volatile ConcurrentLinkedListNode<T> NextNode;
internal ConcurrentLinkedList<T> Owner;
internal volatile ConcurrentLinkedListNode<T> PreviousNode;
private T _value;
#endregion
#region Constructors
/// <summary>
/// Initializes a new instance of the <see cref = "ConcurrentLinkedListNode{T}" /> class, containing the specified value.
/// </summary>
/// <param name = "value">
/// The value to contain in the <see cref = "ConcurrentLinkedListNode{T}" />.
/// </param>
public ConcurrentLinkedListNode(T value)
{
_value = value;
}
#endregion
#region Properties
/// <summary>
/// Gets the <see cref = "ConcurrentLinkedList{T}" /> that the <see cref = "ConcurrentLinkedListNode{T}" /> belongs to.
/// </summary>
/// <returns>
/// A reference to the <see cref = "ConcurrentLinkedList{T}" /> that the <see cref = "ConcurrentLinkedListNode{T}" /> belongs to, or null if the
/// <see cref = "ConcurrentLinkedListNode{T}" /> is not linked.
/// </returns>
public ConcurrentLinkedList<T> List
{
get
{
var lockTaken = false;
try
{
PositionLock.EnsureLockedForReading(ref lockTaken);
return Owner;
}
finally
{
if (lockTaken)
{
PositionLock.UnlockAfterReading();
}
}
}
}
/// <summary>
/// Gets the next node in the <see cref = "ConcurrentLinkedList{T}" />.
/// </summary>
/// <returns>
/// A reference to the next node in the <see cref = "ConcurrentLinkedList{T}" />, or null if the current node is the last element
/// (<see cref = "ConcurrentLinkedList{T}.Last" />) of the <see cref = "ConcurrentLinkedList{T}" />.
/// </returns>
public ConcurrentLinkedListNode<T> Next
{
get
{
var lockTaken = false;
try
{
PositionLock.EnsureLockedForReading(ref lockTaken);
return NextNode != null && NextNode != Owner.First ? NextNode : null;
}
finally
{
if (lockTaken)
{
PositionLock.UnlockAfterReading();
}
}
}
internal set
{
var lockTaken = false;
try
{
PositionLock.EnsureLockedForWriting(ref lockTaken);
NextNode = value;
}
finally
{
if (lockTaken)
{
PositionLock.UnlockAfterWriting();
}
}
}
}
/// <summary>
/// Gets the previous node in the <see cref = "ConcurrentLinkedList{T}" />.
/// </summary>
/// <returns>
/// A reference to the previous node in the <see cref = "ConcurrentLinkedList{T}" />, or null if the current node is the first element
/// (<see cref = "ConcurrentLinkedList{T}.First" />) of the <see cref = "ConcurrentLinkedList{T}" />.
/// </returns>
public ConcurrentLinkedListNode<T> Previous
{
get
{
var lockTaken = false;
try
{
PositionLock.EnsureLockedForReading(ref lockTaken);
return PreviousNode != null && Owner.First != this ? PreviousNode : null;
}
finally
{
if (lockTaken)
{
PositionLock.UnlockAfterReading();
}
}
}
internal set
{
var lockTaken = false;
try
{
PositionLock.EnsureLockedForWriting(ref lockTaken);
PreviousNode = value;
}
finally
{
if (lockTaken)
{
PositionLock.UnlockAfterWriting();
}
}
}
}
/// <summary>
/// Gets the value contained in the node.
/// </summary>
/// <returns>
/// The value contained in the node.
/// </returns>
public T Value
{
get
{
var lockTaken = false;
try
{
ValueLock.EnsureLockedForReading(ref lockTaken);
return _value;
}
finally
{
if (lockTaken)
{
ValueLock.UnlockAfterReading();
}
}
}
set
{
var lockTaken = false;
try
{
ValueLock.EnsureLockedForWriting(ref lockTaken);
_value = value;
}
finally
{
if (lockTaken)
{
ValueLock.UnlockAfterWriting();
}
}
}
}
#endregion
#region Methods
internal void Invalidate()
{
var lockTaken = false;
try
{
PositionLock.EnsureLockedForWriting(ref lockTaken);
Owner = null;
NextNode = null;
PreviousNode = null;
}
finally
{
if(lockTaken)
PositionLock.UnlockAfterWriting();
}
}
#endregion
}
}
namespace System.Collections.Concurrent
{
using Diagnostics;
/// <summary>
/// Debug view for the IProducerConsumerCollection.
/// </summary>
/// <typeparam name = "T">Specifies the type of the data being aggregated.</typeparam>
internal sealed class IProducerConsumerCollection_DebugView<T>
{
#region Fields
private readonly IProducerConsumerCollection<T> _collection;
#endregion
#region Constructors
public IProducerConsumerCollection_DebugView(IProducerConsumerCollection<T> collection)
{
_collection = collection;
}
#endregion
#region Properties
[DebuggerBrowsable(DebuggerBrowsableState.RootHidden)]
public T[] Values
{
get { return _collection.ToArray(); }
}
#endregion
}
}
namespace System.Threading
{
using Diagnostics.Contracts;
/// <summary>
/// Contains extension methods targeting <see cref = "ReaderWriterLockSlim" /> and <see cref = "ReaderWriterLock" />.
/// </summary>
public static class ReaderWriterLockExtensions
{
#region Methods
/// <summary>
/// Downgrades an upgradeable-read lock obtained on a <see cref = "ReaderWriterLockSlim" /> to a read lock.
/// Be sure to call <see cref = "ReaderWriterLockSlim.ExitReadLock" /> when done reading.
/// </summary>
/// <param name = "rwl">The <see cref = "ReaderWriterLockSlim" />.</param>
/// <seealso cref = "ReaderWriterLockSlim.EnterUpgradeableReadLock" />
/// <example>
/// <code>
/// <![CDATA[
/// ReaderWriterLockSlim rwl = …;
/// …
/// bool upgraded = true;
/// rwl.EnterUpgradeableReadLock();
/// try {
/// if (… read some state to decide whether to upgrade …) {
/// var writeLocked = false;
/// try {
/// rwl.UpgradeToWriteLock(ref writeLocked);
/// … write to state …
/// } finally {
/// if(writeLocked)
/// rwl.ExitWriteLock();
/// }
/// } else {
/// rwl.DowngradeToReadLock();
/// upgraded = false;
/// try {
/// … read from state …
/// } finally {
/// rwl.ExitReadLock();
/// }
/// }
/// } finally {
/// if (upgraded)
/// rwl.ExitUpgradeableReadLock();
/// }
/// }
/// ]]>
/// </code>
/// </example>
public static void DowngradeToReadLock(this ReaderWriterLockSlim rwl)
{
Contract.Requires<ArgumentNullException>(rwl != null, "rwl");
Contract.Requires<InvalidOperationException>(rwl.IsUpgradeableReadLockHeld,
"The ReaderWriterLockSlim instance must be in UpgradeableRead mode.");
rwl.EnterReadLock();
rwl.ExitUpgradeableReadLock();
}
/// <summary>
/// Ensures that a read (or upgradeable read) lock is held on the provided <see cref = "ReaderWriterLockSlim" />.
/// </summary>
/// <param name = "rwl">The <see cref = "ReaderWriterLockSlim" /> to lock.</param>
/// <param name = "lockTaken">Indicates if the lock was successfully obtained.</param>
/// <param name = "upgradeable">Determines if an upgradeable read lock should be obtained.</param>
/// <param name = "millisecondsTimeOut">The number of milliseconds to wait to acquire the lock, or -1 (<see cref = "Timeout.Infinite" />) to wait indefinitely.</param>
public static void EnsureLockedForReading(this ReaderWriterLockSlim rwl,
ref bool lockTaken,
bool upgradeable = false,
int millisecondsTimeOut = Timeout.Infinite)
{
Contract.Requires<ArgumentNullException>(rwl != null, "lock");
Contract.Requires<ArgumentException>(lockTaken == false);
Contract.Requires<ArgumentOutOfRangeException>(
millisecondsTimeOut >= 0 || millisecondsTimeOut == Timeout.Infinite,
"The value of 'millisecondsTimeout' is negative, but is not equal to Timeout.Infinite (-1), which is the only negative value allowed.");
Contract.Requires<InvalidOperationException>(
rwl.RecursionPolicy == LockRecursionPolicy.NoRecursion || !rwl.IsWriteLockHeld,
@"The current thread initially entered the lock in write mode, and therefore cannot enter read or upgradeable mode since recursion is disabled.");
Contract.Requires<InvalidOperationException>(
!upgradeable || rwl.IsUpgradeableReadLockHeld || !rwl.IsReadLockHeld,
"The current thread initially entered the lock in read mode, and therefore trying to enter upgradeable mode would create the possibility of a deadlock.");
Contract.Ensures(upgradeable && rwl.IsUpgradeableReadLockHeld || !upgradeable && rwl.IsReadLockHeld);
var supportsRecursion = rwl.RecursionPolicy == LockRecursionPolicy.SupportsRecursion;
if (upgradeable)
{
if (!rwl.IsUpgradeableReadLockHeld || supportsRecursion)
lockTaken = rwl.TryEnterUpgradeableReadLock(millisecondsTimeOut);
}
else
{
if (!rwl.IsReadLockHeld || supportsRecursion)
lockTaken = rwl.TryEnterReadLock(millisecondsTimeOut);
}
}
/// <summary>
/// Ensures that a writer lock is held on the provided <see cref = "ReaderWriterLockSlim" />.
/// </summary>
/// <param name = "rwl">The <see cref = "ReaderWriterLockSlim" /> to lock.</param>
/// <param name = "lockTaken">Indicates if the lock was successfully obtained.</param>
/// <param name = "millisecondsTimeOut">The number of milliseconds to wait to acquire the lock, or -1 (<see cref = "Timeout.Infinite" />) to wait indefinitely.</param>
public static void EnsureLockedForWriting(this ReaderWriterLockSlim rwl,
ref bool lockTaken,
int millisecondsTimeOut = Timeout.Infinite)
{
Contract.Requires<ArgumentNullException>(rwl != null, "rwl");
Contract.Requires<ArgumentException>(lockTaken == false);
Contract.Requires<ArgumentOutOfRangeException>(
millisecondsTimeOut >= 0 || millisecondsTimeOut == Timeout.Infinite,
"The value of 'millisecondsTimeout' is negative, but is not equal to Timeout.Infinite (-1), which is the only negative value allowed.");
Contract.Requires<InvalidOperationException>(!rwl.IsReadLockHeld,
"The current thread initially entered the lock in read mode, and therefore trying to enter write mode would create the possibility of a deadlock.");
if (!rwl.IsWriteLockHeld || rwl.RecursionPolicy == LockRecursionPolicy.SupportsRecursion)
{
lockTaken = rwl.TryEnterWriteLock(millisecondsTimeOut);
}
}
/// <summary>
/// Release the current read (or upgradeable read) lock (if any) on the provided <see cref = "ReaderWriterLockSlim" />.
/// </summary>
/// <param name = "rwl">The <see cref = "ReaderWriterLockSlim" /> to release.</param>
public static void UnlockAfterReading(this ReaderWriterLockSlim rwl)
{
Contract.Requires<ArgumentNullException>(rwl != null, "lock");
Contract.Ensures(!rwl.IsReadLockHeld && !rwl.IsUpgradeableReadLockHeld);
if (rwl.IsUpgradeableReadLockHeld)
rwl.ExitUpgradeableReadLock();
if (rwl.IsReadLockHeld)
rwl.ExitReadLock();
}
/// <summary>
/// Release the current write lock (if any) on the provided <see cref = "ReaderWriterLockSlim" />.
/// </summary>
/// <param name = "rwl">The <see cref = "ReaderWriterLockSlim" /> to unlock.</param>
public static void UnlockAfterWriting(this ReaderWriterLockSlim rwl)
{
Contract.Requires<ArgumentNullException>(rwl != null, "lock");
Contract.Ensures(!rwl.IsWriteLockHeld);
if (rwl.IsWriteLockHeld)
{
rwl.ExitWriteLock();
}
}
/// <summary>
/// Upgrades an upgradeable-read lock obtained on a <see cref = "ReaderWriterLockSlim" /> to a write lock.
/// Be sure to call <see cref = "ReaderWriterLockSlim.ExitWriteLock" /> when done writing.
/// </summary>
/// <param name = "rwl">The <see cref = "ReaderWriterLockSlim" />.</param>
/// <param name = "lockTaken"></param>
/// <seealso cref = "ReaderWriterLockSlim.EnterUpgradeableReadLock" />
/// <example>
/// <code>
/// <![CDATA[
/// ReaderWriterLockSlim rwl = …;
/// …
/// bool upgraded = true;
/// rwl.EnterUpgradeableReadLock();
/// try {
/// if (… read some state to decide whether to upgrade …) {
/// var writeLocked = false;
/// try {
/// rwl.UpgradeToWriteLock(ref writeLocked);
/// … write to state …
/// } finally {
/// if(writeLocked)
/// rwl.ExitWriteLock();
/// }
/// } else {
/// rwl.DowngradeToReadLock();
/// upgraded = false;
/// try {
/// … read from state …
/// } finally {
/// rwl.ExitReadLock();
/// }
/// }
/// } finally {
/// if (upgraded)
/// rwl.ExitUpgradeableReadLock();
/// }
/// }
/// ]]>
/// </code>
/// </example>
public static void UpgradeToWriteLock(this ReaderWriterLockSlim rwl, ref bool lockTaken)
{
Contract.Requires<ArgumentNullException>(rwl != null, "rwl");
Contract.Requires<InvalidOperationException>(rwl.IsUpgradeableReadLockHeld,
"The ReaderWriterLockSlim instance must be in UpgradeableRead mode.");
rwl.EnsureLockedForWriting(ref lockTaken);
}
#endregion
}
}
@jimitndiaye

This comment has been minimized.

Copy link
Owner Author

jimitndiaye commented Oct 31, 2011

Couldn't find any implementations of a ConcurrentLinkedList for .NET (apparently it was in the betas but got dropped from .Net4 RTM). This is a preliminary implementation no doubt full of potential race conditions or performance issues, so use at your own peril. I'd appreciate any feedback. Be gentle though - it's my first time ;-)

@oberxon

This comment has been minimized.

Copy link

oberxon commented Jun 28, 2016

good work!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.