Skip to content

Instantly share code, notes, and snippets.

@aevitas
Created August 23, 2016 09:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aevitas/35329077d4a597b94f7b9d23181ddb6a to your computer and use it in GitHub Desktop.
Save aevitas/35329077d4a597b94f7b9d23181ddb6a to your computer and use it in GitHub Desktop.
Extension of the System.Threading.Dataflow library providing pipeline functionality
public static class Dataflow
{
internal static readonly DataflowLinkOptions DefaultLinkOptions = new DataflowLinkOptions {PropagateCompletion = true};
/// <summary>
/// Creates a dataflow from the specified blocks, assuming the first block as the input block, and linking subsequent
/// blocks together.
/// </summary>
/// <typeparam name="TInput">The type of the input.</typeparam>
/// <param name="name">The name.</param>
/// <param name="blocks">The blocks.</param>
/// <returns></returns>
/// <exception cref="System.ArgumentNullException"></exception>
/// <exception cref="System.ArgumentException">
/// Collection of blocks to create a dataflow can not contain null elements!
/// or
/// </exception>
public static Dataflow<TInput> FromBlocks<TInput>(string name, IEnumerable<IDataflowBlock> blocks)
{
if (blocks == null)
throw new ArgumentNullException(nameof(blocks));
var b = blocks.ToList();
if (!b.Any())
throw new ArgumentException("Can not construct dataflow from an empty collection of blocks!");
if (b.Any(o => o == null))
throw new ArgumentException("Collection of blocks to create a dataflow can not contain null elements!");
var inputBlock = b.FirstOrDefault() as ITargetBlock<TInput>;
if (inputBlock == null)
throw new ArgumentException(
$"First block in provided block collection was not a target block for type {typeof (TInput).Name}!");
var flow = new Dataflow<TInput>(name, inputBlock);
for (var i = 1; i < b.Count; i++)
flow.AppendBlock(b[i]);
return flow;
}
}
/// <summary>
/// Represents a data flow that accepts input, and processes it.
/// </summary>
/// <typeparam name="TInput">The type of the input.</typeparam>
public class Dataflow<TInput> : IDataflow, IDataflow<TInput>
{
/// <summary>
/// Initializes a new instance of the <see cref="Dataflow{TInput}" /> class.
/// </summary>
/// <param name="name">The name.</param>
/// <param name="inputBlock">The input block.</param>
/// <exception cref="System.ArgumentNullException">
/// </exception>
public Dataflow(string name, ITargetBlock<TInput> inputBlock)
{
if (inputBlock == null)
throw new ArgumentNullException(nameof(inputBlock));
if (string.IsNullOrEmpty(name))
throw new ArgumentNullException(nameof(name));
InputBlock = inputBlock;
Name = name;
}
/// <summary>
/// Gets the name of the dataflow used to represent this instance visually.
/// </summary>
public string Name { get; }
IList<IDataflowBlock> IDataflow.Blocks { get; } = new List<IDataflowBlock>();
public Task Completion
=> ((IDataflow) this).Blocks.Any() ? ((IDataflow) this).Blocks.LastOrDefault()?.Completion : InputBlock.Completion;
/// <summary>
/// Appends the specified block to the end of the data flow, and links the previous block in the dataflow to it.
/// </summary>
/// <param name="block">The block.</param>
/// <exception cref="System.ArgumentNullException"></exception>
/// <exception cref="System.ArgumentException">
/// </exception>
/// <exception cref="System.InvalidOperationException">
/// Can not append block as the last block in the block list is not a source block (and subsequently, can't be linked
/// from)!
/// </exception>
public void AppendBlock(IDataflowBlock block)
{
AppendBlock(block, null);
}
/// <summary>
/// Gets a value indicating whether this dataflow is accepting blocks, i.e. blocks can be appended to it.
/// </summary>
/// <value>
/// <c>true</c> if this instance is open; otherwise, <c>false</c>.
/// </value>
public bool IsAcceptingBlocks
{
get
{
// We consider a dataflow "closed" when blocks can no longer be appended
// to it, because the last block in its block list is not a source block.
// Dataflows without any blocks appended to it (other than the input block),
// are considered to be open by definition.
var blocks = ((IDataflow) this).Blocks;
if (!blocks.Any())
return true;
var lastBlock = blocks.Last();
return lastBlock is ISourceBlock<TInput>;
}
}
/// <summary>
/// Completes this dataflow and all blocks contained in the dataflow appended via AppendBlock.
/// </summary>
public void Complete()
{
InputBlock.Complete();
}
/// <summary>
/// Faults the dataflow with the specified exception, and all blocks contained in the dataflow appended via
/// AppendBlock. This exception is propagated to the Completion task.
/// </summary>
/// <param name="ex">The ex.</param>
/// <exception cref="System.ArgumentNullException"></exception>
public void Fault(Exception ex)
{
if (ex == null)
throw new ArgumentNullException(nameof(ex));
InputBlock.Fault(ex);
}
/// <summary>
/// Gets the input block used by this dataflow as the first block in the flow, and from which data is propagated to
/// subsequent blocks.
/// </summary>
/// <value>
/// The input block.
/// </value>
public ITargetBlock<TInput> InputBlock { get; }
/// <summary>
/// Appends the specified block to the end of the data flow, and links the previous block in the dataflow to it using
/// the specified predicate to filter messages being passed.
/// </summary>
/// <param name="block">The block.</param>
/// <param name="linkPredicate">The link predicate.</param>
/// <exception cref="System.ArgumentNullException"></exception>
/// <exception cref="System.ArgumentException">
/// </exception>
/// <exception cref="System.InvalidOperationException">
/// Can not append block as the last block in the block list is not a source block (and subsequently, can't be linked
/// from)!
/// </exception>
public void AppendBlock(IDataflowBlock block, Predicate<TInput> linkPredicate)
// This can probably be ITargetBlock<TInput>, but do check.
{
if (block == null)
throw new ArgumentNullException(nameof(block));
var blocks = ((IDataflow) this).Blocks;
var targetBlock = block as ITargetBlock<TInput>;
if (!blocks.Any())
{
// If we currently have no blocks in the dataflow, we'll have to link the one
// being passed here to the input block. We can only do that if the block passed
// is a target block of TInput, and the input block is a source block of TInput.
if (targetBlock == null)
throw new ArgumentException($"Specified block is not a target block for type {typeof (TInput).Name}!",
nameof(block));
var sourceBlock = InputBlock as ISourceBlock<TInput>;
if (sourceBlock == null)
throw new InvalidOperationException(
$"Can not link the specified block to the dataflow's input block - the input block is not a source block of type {typeof (TInput).Name}!");
// Link up the source and destination block, and append the specified block to the dataflow's block list.
if (linkPredicate == null)
sourceBlock.LinkTo(targetBlock, Dataflow.DefaultLinkOptions);
else
{
sourceBlock.LinkTo(targetBlock, Dataflow.DefaultLinkOptions, linkPredicate);
}
blocks.Add(block);
return;
}
// Last should never throw here - if there were no blocks in the dataflow's block list,
// we should have linked it to the input block previously (and added to the block list after).
var lastBlock = blocks.Last();
// Last block must be a source block in order to link to it, otherwise, the dataflow is considered "closed"
var lastSource = lastBlock as ISourceBlock<TInput>;
if (lastSource == null)
throw new InvalidOperationException(
"Can not append block as the last block in the block list is not a source block (and subsequently, can't be linked from)!");
if (targetBlock == null)
throw new ArgumentException($"Specified block is not a target block for type {typeof (TInput).Name}!",
nameof(block));
if (linkPredicate == null)
lastSource.LinkTo(targetBlock, Dataflow.DefaultLinkOptions);
else
{
lastSource.LinkTo(targetBlock, Dataflow.DefaultLinkOptions, linkPredicate);
}
blocks.Add(block);
}
}
public class Dataflow<TInput, TOutput> : Dataflow<TInput>, IDataflow<TInput, TOutput>
{
public Dataflow(ITargetBlock<TInput> inputBlock, string name) : base(name, inputBlock)
{
}
public ISourceBlock<TOutput> OutputBlock
{
get
{
// If we have no blocks appended to the data flow, the input block is also the output block.
// If we do have blocks appended, it's always the last block in the list.
var blocks = ((IDataflow) this).Blocks;
if (!blocks.Any())
{
// We could throw here if the input block isn't an output block of TOut,
// but let's, favour properties being "safe" to call over strictness here.
return (ISourceBlock<TOutput>) InputBlock;
}
return (ISourceBlock<TOutput>) blocks.Last();
}
}
/// <summary>
/// Links this dataflow to the specified target dataflow. This operation requires the source block's data type to match
/// the target dataflow's input block's.
/// </summary>
/// <param name="targetDataflow">The target dataflow.</param>
/// <exception cref="System.ArgumentNullException"></exception>
/// <exception cref="System.InvalidOperationException">
/// Can not link a source dataflow to the specified target dataflow -
/// the source has no output block. Are you sure the source dataflow contains at least one block?
/// </exception>
public void LinkTo(IDataflow<TOutput> targetDataflow)
{
if (targetDataflow == null)
throw new ArgumentNullException(nameof(targetDataflow));
if (OutputBlock == null)
throw new InvalidOperationException(
"Can not link a source dataflow to the specified target dataflow - the source has no valid output block. Are you sure the source dataflow contains at least one block?");
OutputBlock.LinkTo(targetDataflow.InputBlock, Dataflow.DefaultLinkOptions);
}
}
public interface IDataflow
{
Task Completion { get; }
string Name { get; }
IList<IDataflowBlock> Blocks { get; }
void AppendBlock(IDataflowBlock block);
void Complete();
void Fault(Exception ex);
bool IsAcceptingBlocks { get; }
}
public interface IDataflow<TInput>: IDataflow
{
ITargetBlock<TInput> InputBlock { get; }
}
public interface IDataflowOut<TOutput>: IDataflow
{
ISourceBlock<TOutput> OutputBlock { get; }
void LinkTo(IDataflow<TOutput> targetDataflow);
}
public interface IDataflow<TInput, TOutput> : IDataflow<TInput>, IDataflowOut<TOutput>
{
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment