Skip to content

Instantly share code, notes, and snippets.

@vbedegi
Created March 5, 2012 21:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vbedegi/1981082 to your computer and use it in GitHub Desktop.
Save vbedegi/1981082 to your computer and use it in GitHub Desktop.
A transactional, concurrent queue for .Net
var queue = new TransactionalConcurrentQueue<int>();
using (var scope = new TransactionScope())
{
queue.Enqueue(1);
// won't happen
queue.Dequeue(x => Console.WriteLine(x));
scope.Complete();
}
// will happen
queue.Dequeue(x => Console.WriteLine(x));
var queue = new TransactionalConcurrentQueue<int>();
queue.Enqueue(1);
using (var scope = new TransactionScope())
{
// will happen
queue.Dequeue(x => Console.WriteLine(x));
// no scope.Complete() --> rollback!
}
// will happen, again
queue.Dequeue(x => Console.WriteLine(x));
public class TransactionalConcurrentQueue<T>
{
abstract class QueueOperation : IEnlistmentNotification
{
private readonly ConcurrentQueue<T> queue;
private readonly T item;
protected QueueOperation(ConcurrentQueue<T> queue, T item)
{
this.queue = queue;
this.item = item;
}
public void Prepare(PreparingEnlistment preparingEnlistment)
{
preparingEnlistment.Prepared();
}
public virtual void Commit(Enlistment enlistment)
{
}
public virtual void Rollback(Enlistment enlistment)
{
}
public void InDoubt(Enlistment enlistment)
{
}
protected ConcurrentQueue<T> Queue { get { return queue; } }
protected T Item { get { return item; } }
}
class EnqueueOperation : QueueOperation
{
public EnqueueOperation(ConcurrentQueue<T> queue, T item)
: base(queue, item)
{
}
public override void Commit(Enlistment enlistment)
{
Queue.Enqueue(Item);
}
}
class DequeueOperation : QueueOperation
{
public DequeueOperation(ConcurrentQueue<T> queue, T item)
: base(queue, item)
{
}
public override void Rollback(Enlistment enlistment)
{
Queue.Enqueue(Item);
}
}
private readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();
public void Enqueue(T item)
{
var tx = Transaction.Current;
if (tx == null)
queue.Enqueue(item);
else
tx.EnlistVolatile(new EnqueueOperation(queue, item), EnlistmentOptions.None);
}
public bool Dequeue(Action<T> action)
{
T item;
if (!queue.TryDequeue(out item))
return false;
var tx = Transaction.Current;
if (tx != null)
tx.EnlistVolatile(new DequeueOperation(queue, item), EnlistmentOptions.None);
action(item);
return true;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment