Skip to content

Instantly share code, notes, and snippets.

@meisinger
Last active August 29, 2015 14:00
Show Gist options
  • Save meisinger/b25437741d9b963904cd to your computer and use it in GitHub Desktop.
Save meisinger/b25437741d9b963904cd to your computer and use it in GitHub Desktop.
queue with cooperative cancellation
public class CooperativeQueue<TMessage>
where TMessage : class, IQueueMessage
{
private readonly Queue<TMessage> internalQueue;
private readonly ManualResetEventSlim manualReset;
private bool open;
public CooperativeQueue()
{
internalQueue = new Queue<TMessage>();
manualReset = new ManualResetEventSlim();
open = true;
}
public void Close()
{
lock (internalQueue)
{
open = false;
manualReset.Set();
}
}
public TMessage Dequeue()
{
return Dequeue(CancellationToken.None);
}
public TMessage Dequeue(CancellationToken token)
{
while (true)
{
if (token.IsCancellationRequested)
break;
EnsureState();
lock (internalQueue)
{
if (internalQueue.Count != 0)
return internalQueue.Dequeue();
}
try
{
if (manualReset.IsSet)
manualReset.Reset();
manualReset.Wait(token);
}
catch (OperationCanceledException)
{
break;
}
}
return default(TMessage);
}
public void Enqueue(TMessage item)
{
lock (internalQueue)
{
EnsureState();
internalQueue.Enqueue(item);
manualReset.Set();
}
}
private void EnsureState()
{
if (!open)
throw new EndOfStreamException();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment