Skip to content

Instantly share code, notes, and snippets.

@yreynhout
Last active August 29, 2015 14:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yreynhout/f0f09f7541902bb930c8 to your computer and use it in GitHub Desktop.
Save yreynhout/f0f09f7541902bb930c8 to your computer and use it in GitHub Desktop.
Queue all projection requests - report back using TCS
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Projac;
namespace ConsoleApplication666
{
public class AsyncProjector
{
private readonly AsyncSqlProjector _projector;
private readonly ConcurrentQueue<IProjectOperation> _operations;
private int _processing;
public AsyncProjector(AsyncSqlProjector projector)
{
if (projector == null)
throw new ArgumentNullException("projector");
_projector = projector;
_operations = new ConcurrentQueue<IProjectOperation>();
_processing = 0;
}
public Task<int> ProjectAsync(object message)
{
if (message == null) throw new ArgumentNullException("message");
return ProjectAsync(message, CancellationToken.None);
}
public Task<int> ProjectAsync(object message, CancellationToken cancellationToken)
{
if (message == null) throw new ArgumentNullException("message");
var operation = new ProjectMessageOperation(message, cancellationToken);
_operations.Enqueue(operation);
if (Interlocked.CompareExchange(ref _processing, 1, 0) == 0)
ThreadPool.QueueUserWorkItem(ProcessQueue);
return operation.Task;
}
public Task<int> ProjectAsync(IEnumerable<object> messages)
{
if (messages == null) throw new ArgumentNullException("messages");
return ProjectAsync(messages, CancellationToken.None);
}
public Task<int> ProjectAsync(IEnumerable<object> messages, CancellationToken cancellationToken)
{
if (messages == null) throw new ArgumentNullException("messages");
var operation = new ProjectMessagesOperation(messages, cancellationToken);
_operations.Enqueue(operation);
if (Interlocked.CompareExchange(ref _processing, 1, 0) == 0)
ThreadPool.QueueUserWorkItem(ProcessQueue);
return operation.Task;
}
private void ProcessQueue(object state)
{
do
{
IProjectOperation operation;
while (_operations.TryDequeue(out operation))
{
operation.Project(_projector);
}
Interlocked.Exchange(ref _processing, 0);
} while (_operations.Count > 0 && Interlocked.CompareExchange(ref _processing, 1, 0) == 0);
}
private interface IProjectOperation
{
void Project(AsyncSqlProjector projector);
Task<int> Task { get; }
}
private class ProjectMessageOperation : IProjectOperation
{
private readonly object _message;
private readonly CancellationToken _cancellationToken;
private readonly TaskCompletionSource<int> _source;
public ProjectMessageOperation(object message, CancellationToken cancellationToken)
{
if (message == null)
throw new ArgumentNullException("message");
_message = message;
_cancellationToken = cancellationToken;
_source = new TaskCompletionSource<int>();
}
public void Project(AsyncSqlProjector projector)
{
if (_cancellationToken.IsCancellationRequested)
{
_source.SetCanceled();
}
else
{
projector.
ProjectAsync(_message, _cancellationToken).
ContinueWith(task =>
{
if (task.IsCanceled)
{
_source.SetCanceled();
}
else if (task.IsFaulted)
{
_source.SetException(task.Exception);
}
else
{
_source.SetResult(task.Result);
}
}, _cancellationToken);
}
}
public Task<int> Task { get { return _source.Task; } }
}
private class ProjectMessagesOperation : IProjectOperation
{
private readonly IEnumerable<object> _messages;
private readonly CancellationToken _cancellationToken;
private readonly TaskCompletionSource<int> _source;
public ProjectMessagesOperation(IEnumerable<object> messages, CancellationToken cancellationToken)
{
if (messages == null)
throw new ArgumentNullException("messages");
_messages = messages;
_cancellationToken = cancellationToken;
_source = new TaskCompletionSource<int>();
}
public void Project(AsyncSqlProjector projector)
{
if (_cancellationToken.IsCancellationRequested)
{
_source.SetCanceled();
}
else
{
projector.
ProjectAsync(_messages, _cancellationToken).
ContinueWith(task =>
{
if (task.IsCanceled)
{
_source.SetCanceled();
}
else if (task.IsFaulted)
{
_source.SetException(task.Exception);
}
else
{
_source.SetResult(task.Result);
}
}, _cancellationToken);
}
}
public Task<int> Task { get { return _source.Task; } }
}
}
}
@yreynhout
Copy link
Author

And it's broken :(

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment