Skip to content

Instantly share code, notes, and snippets.

@coolya
Created August 31, 2012 10:13
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save coolya/3551118 to your computer and use it in GitHub Desktop.
Save coolya/3551118 to your computer and use it in GitHub Desktop.
EnumerableExtensions.cs
public static class EnumerableExtensions
{
public static BlockingCollection<T> CopyToBlockingCollectionAsync<T>(this IEnumerable<T> source)
{
var collection = new BlockingCollection<T>();
Task.Factory.StartNew(() =>
{
foreach (var item in source)
{
collection.Add(item);
}
collection.CompleteAdding();
});
return collection;
}
public static BlockingCollection<T> CopyToBlockingCollectionAsync<T>(this IEnumerable<T> source, int buffersize)
{
var collection = new BlockingCollection<T>(buffersize);
Task.Factory.StartNew(() =>
{
foreach (var item in source)
{
collection.Add(item);
}
collection.CompleteAdding();
});
return collection;
}
public static Task ConsumeAsync<T>(this BlockingCollection<T> source, params Action<T>[] consumers)
{
return Task.Factory.StartNew(() =>
{
foreach (var item in source.GetConsumingEnumerable())
{
foreach (var consumer in consumers)
{
consumer(item);
}
}
});
}
public static Task ConsumeAsync<T>(this BlockingCollection<T> source, Action<T> consumer)
{
return Task.Factory.StartNew(() =>
{
foreach (var item in source.GetConsumingEnumerable())
{
consumer(item);
}
});
}
public static Tuple<BlockingCollection<T>, BlockingCollection<T>> Fork<T>(this BlockingCollection<T> source)
{
var fork1 = new BlockingCollection<T>();
var fork2 = new BlockingCollection<T>();
source.ConsumeAsync(fork1.Add, fork2.Add).ContinueWith(t =>
{
fork1.CompleteAdding();
fork2.CompleteAdding();
});
return Tuple.Create(fork1, fork2);
}
public static Tuple<BlockingCollection<T>, BlockingCollection<T>> Fork<T>(this BlockingCollection<T> source, int boundedCapacity)
{
var fork1 = new BlockingCollection<T>(boundedCapacity);
var fork2 = new BlockingCollection<T>(boundedCapacity);
source.ConsumeAsync(fork1.Add, fork2.Add).ContinueWith(t =>
{
fork1.CompleteAdding();
fork2.CompleteAdding();
});
return Tuple.Create(fork1, fork2);
}
public static Task Join<T>(this BlockingCollection<T> target, params BlockingCollection<T>[] sources)
{
return Task.Factory.StartNew(() =>
{
var tasks = new Task[sources.Length];
for (int i = 0; i < sources.Length; i++)
{
tasks[i] = sources[i].ConsumeAsync(target.Add);
}
Task.WaitAll(tasks);
});
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment