Skip to content

Instantly share code, notes, and snippets.

@electricessence
Created January 12, 2017 10:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save electricessence/6a5e7525f4fbc2b63f9500c0f35819c1 to your computer and use it in GitHub Desktop.
Save electricessence/6a5e7525f4fbc2b63f9500c0f35819c1 to your computer and use it in GitHub Desktop.
A LINQ style extension that asynchronously pre-enumerates (buffers) a source enumerable up to a count.
public static IEnumerable<T> PreCache<T>(this IEnumerable<T> source, int count = 1)
{
var e = source.GetEnumerator();
var queue = new BufferBlock<T>();
ActionBlock<bool> worker = null;
Func<bool> tryQueue = () =>
e.ConcurrentMoveNext(
value => queue.Post(value),
() => worker.Complete());
worker = new ActionBlock<bool>(synchronousFill =>
{ while (queue.Count < count && tryQueue() && synchronousFill); },
// The consumers will dictate the amount of parallelism.
new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 32 });
worker.PropagateCompletionTo(queue);
// The very first call (kick-off) should be synchronous.
if(tryQueue()) while (true)
{
// Is something already availaible in the queue? Get it.
T item;
if (queue.TryReceive(null, out item))
{
worker.SendAsync(true);
yield return item;
}
else
{
// At this point, something could be in the queue again, but let's assume not and try an trigger more.
if (worker.Post(true))
{
// The .Post call is 'accepted' (doesn't mean it was run).
// Setup the wait for recieve the next avaialable.
var task = queue.ReceiveAsync();
task.Wait();
if(task.IsFaulted)
{
throw task.Exception.InnerException;
}
if(!task.IsCanceled) // Cancelled means there's nothing to get.
{
// Task was not cancelled and there is a result availaible;
yield return task.Result;
continue;
}
}
yield break;
}
}
}
public static bool ConcurrentMoveNext<T>(this IEnumerator<T> source, Action<T> trueHandler, Action falseHandler = null)
{
// Always lock on next to prevent concurrency issues.
lock (source) // a standard enumerable can't handle concurrency.
{
if (source.MoveNext())
{
trueHandler(source.Current);
return true;
}
}
if (falseHandler != null) falseHandler();
return false;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment