Created
August 23, 2016 09:24
-
-
Save aevitas/35329077d4a597b94f7b9d23181ddb6a to your computer and use it in GitHub Desktop.
Extension of the System.Threading.Dataflow library providing pipeline functionality
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static class Dataflow | |
{ | |
internal static readonly DataflowLinkOptions DefaultLinkOptions = new DataflowLinkOptions {PropagateCompletion = true}; | |
/// <summary> | |
/// Creates a dataflow from the specified blocks, assuming the first block as the input block, and linking subsequent | |
/// blocks together. | |
/// </summary> | |
/// <typeparam name="TInput">The type of the input.</typeparam> | |
/// <param name="name">The name.</param> | |
/// <param name="blocks">The blocks.</param> | |
/// <returns></returns> | |
/// <exception cref="System.ArgumentNullException"></exception> | |
/// <exception cref="System.ArgumentException"> | |
/// Collection of blocks to create a dataflow can not contain null elements! | |
/// or | |
/// </exception> | |
public static Dataflow<TInput> FromBlocks<TInput>(string name, IEnumerable<IDataflowBlock> blocks) | |
{ | |
if (blocks == null) | |
throw new ArgumentNullException(nameof(blocks)); | |
var b = blocks.ToList(); | |
if (!b.Any()) | |
throw new ArgumentException("Can not construct dataflow from an empty collection of blocks!"); | |
if (b.Any(o => o == null)) | |
throw new ArgumentException("Collection of blocks to create a dataflow can not contain null elements!"); | |
var inputBlock = b.FirstOrDefault() as ITargetBlock<TInput>; | |
if (inputBlock == null) | |
throw new ArgumentException( | |
$"First block in provided block collection was not a target block for type {typeof (TInput).Name}!"); | |
var flow = new Dataflow<TInput>(name, inputBlock); | |
for (var i = 1; i < b.Count; i++) | |
flow.AppendBlock(b[i]); | |
return flow; | |
} | |
} | |
/// <summary> | |
/// Represents a data flow that accepts input, and processes it. | |
/// </summary> | |
/// <typeparam name="TInput">The type of the input.</typeparam> | |
public class Dataflow<TInput> : IDataflow, IDataflow<TInput> | |
{ | |
/// <summary> | |
/// Initializes a new instance of the <see cref="Dataflow{TInput}" /> class. | |
/// </summary> | |
/// <param name="name">The name.</param> | |
/// <param name="inputBlock">The input block.</param> | |
/// <exception cref="System.ArgumentNullException"> | |
/// </exception> | |
public Dataflow(string name, ITargetBlock<TInput> inputBlock) | |
{ | |
if (inputBlock == null) | |
throw new ArgumentNullException(nameof(inputBlock)); | |
if (string.IsNullOrEmpty(name)) | |
throw new ArgumentNullException(nameof(name)); | |
InputBlock = inputBlock; | |
Name = name; | |
} | |
/// <summary> | |
/// Gets the name of the dataflow used to represent this instance visually. | |
/// </summary> | |
public string Name { get; } | |
IList<IDataflowBlock> IDataflow.Blocks { get; } = new List<IDataflowBlock>(); | |
public Task Completion | |
=> ((IDataflow) this).Blocks.Any() ? ((IDataflow) this).Blocks.LastOrDefault()?.Completion : InputBlock.Completion; | |
/// <summary> | |
/// Appends the specified block to the end of the data flow, and links the previous block in the dataflow to it. | |
/// </summary> | |
/// <param name="block">The block.</param> | |
/// <exception cref="System.ArgumentNullException"></exception> | |
/// <exception cref="System.ArgumentException"> | |
/// </exception> | |
/// <exception cref="System.InvalidOperationException"> | |
/// Can not append block as the last block in the block list is not a source block (and subsequently, can't be linked | |
/// from)! | |
/// </exception> | |
public void AppendBlock(IDataflowBlock block) | |
{ | |
AppendBlock(block, null); | |
} | |
/// <summary> | |
/// Gets a value indicating whether this dataflow is accepting blocks, i.e. blocks can be appended to it. | |
/// </summary> | |
/// <value> | |
/// <c>true</c> if this instance is open; otherwise, <c>false</c>. | |
/// </value> | |
public bool IsAcceptingBlocks | |
{ | |
get | |
{ | |
// We consider a dataflow "closed" when blocks can no longer be appended | |
// to it, because the last block in its block list is not a source block. | |
// Dataflows without any blocks appended to it (other than the input block), | |
// are considered to be open by definition. | |
var blocks = ((IDataflow) this).Blocks; | |
if (!blocks.Any()) | |
return true; | |
var lastBlock = blocks.Last(); | |
return lastBlock is ISourceBlock<TInput>; | |
} | |
} | |
/// <summary> | |
/// Completes this dataflow and all blocks contained in the dataflow appended via AppendBlock. | |
/// </summary> | |
public void Complete() | |
{ | |
InputBlock.Complete(); | |
} | |
/// <summary> | |
/// Faults the dataflow with the specified exception, and all blocks contained in the dataflow appended via | |
/// AppendBlock. This exception is propagated to the Completion task. | |
/// </summary> | |
/// <param name="ex">The ex.</param> | |
/// <exception cref="System.ArgumentNullException"></exception> | |
public void Fault(Exception ex) | |
{ | |
if (ex == null) | |
throw new ArgumentNullException(nameof(ex)); | |
InputBlock.Fault(ex); | |
} | |
/// <summary> | |
/// Gets the input block used by this dataflow as the first block in the flow, and from which data is propagated to | |
/// subsequent blocks. | |
/// </summary> | |
/// <value> | |
/// The input block. | |
/// </value> | |
public ITargetBlock<TInput> InputBlock { get; } | |
/// <summary> | |
/// Appends the specified block to the end of the data flow, and links the previous block in the dataflow to it using | |
/// the specified predicate to filter messages being passed. | |
/// </summary> | |
/// <param name="block">The block.</param> | |
/// <param name="linkPredicate">The link predicate.</param> | |
/// <exception cref="System.ArgumentNullException"></exception> | |
/// <exception cref="System.ArgumentException"> | |
/// </exception> | |
/// <exception cref="System.InvalidOperationException"> | |
/// Can not append block as the last block in the block list is not a source block (and subsequently, can't be linked | |
/// from)! | |
/// </exception> | |
public void AppendBlock(IDataflowBlock block, Predicate<TInput> linkPredicate) | |
// This can probably be ITargetBlock<TInput>, but do check. | |
{ | |
if (block == null) | |
throw new ArgumentNullException(nameof(block)); | |
var blocks = ((IDataflow) this).Blocks; | |
var targetBlock = block as ITargetBlock<TInput>; | |
if (!blocks.Any()) | |
{ | |
// If we currently have no blocks in the dataflow, we'll have to link the one | |
// being passed here to the input block. We can only do that if the block passed | |
// is a target block of TInput, and the input block is a source block of TInput. | |
if (targetBlock == null) | |
throw new ArgumentException($"Specified block is not a target block for type {typeof (TInput).Name}!", | |
nameof(block)); | |
var sourceBlock = InputBlock as ISourceBlock<TInput>; | |
if (sourceBlock == null) | |
throw new InvalidOperationException( | |
$"Can not link the specified block to the dataflow's input block - the input block is not a source block of type {typeof (TInput).Name}!"); | |
// Link up the source and destination block, and append the specified block to the dataflow's block list. | |
if (linkPredicate == null) | |
sourceBlock.LinkTo(targetBlock, Dataflow.DefaultLinkOptions); | |
else | |
{ | |
sourceBlock.LinkTo(targetBlock, Dataflow.DefaultLinkOptions, linkPredicate); | |
} | |
blocks.Add(block); | |
return; | |
} | |
// Last should never throw here - if there were no blocks in the dataflow's block list, | |
// we should have linked it to the input block previously (and added to the block list after). | |
var lastBlock = blocks.Last(); | |
// Last block must be a source block in order to link to it, otherwise, the dataflow is considered "closed" | |
var lastSource = lastBlock as ISourceBlock<TInput>; | |
if (lastSource == null) | |
throw new InvalidOperationException( | |
"Can not append block as the last block in the block list is not a source block (and subsequently, can't be linked from)!"); | |
if (targetBlock == null) | |
throw new ArgumentException($"Specified block is not a target block for type {typeof (TInput).Name}!", | |
nameof(block)); | |
if (linkPredicate == null) | |
lastSource.LinkTo(targetBlock, Dataflow.DefaultLinkOptions); | |
else | |
{ | |
lastSource.LinkTo(targetBlock, Dataflow.DefaultLinkOptions, linkPredicate); | |
} | |
blocks.Add(block); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class Dataflow<TInput, TOutput> : Dataflow<TInput>, IDataflow<TInput, TOutput> | |
{ | |
public Dataflow(ITargetBlock<TInput> inputBlock, string name) : base(name, inputBlock) | |
{ | |
} | |
public ISourceBlock<TOutput> OutputBlock | |
{ | |
get | |
{ | |
// If we have no blocks appended to the data flow, the input block is also the output block. | |
// If we do have blocks appended, it's always the last block in the list. | |
var blocks = ((IDataflow) this).Blocks; | |
if (!blocks.Any()) | |
{ | |
// We could throw here if the input block isn't an output block of TOut, | |
// but let's, favour properties being "safe" to call over strictness here. | |
return (ISourceBlock<TOutput>) InputBlock; | |
} | |
return (ISourceBlock<TOutput>) blocks.Last(); | |
} | |
} | |
/// <summary> | |
/// Links this dataflow to the specified target dataflow. This operation requires the source block's data type to match | |
/// the target dataflow's input block's. | |
/// </summary> | |
/// <param name="targetDataflow">The target dataflow.</param> | |
/// <exception cref="System.ArgumentNullException"></exception> | |
/// <exception cref="System.InvalidOperationException"> | |
/// Can not link a source dataflow to the specified target dataflow - | |
/// the source has no output block. Are you sure the source dataflow contains at least one block? | |
/// </exception> | |
public void LinkTo(IDataflow<TOutput> targetDataflow) | |
{ | |
if (targetDataflow == null) | |
throw new ArgumentNullException(nameof(targetDataflow)); | |
if (OutputBlock == null) | |
throw new InvalidOperationException( | |
"Can not link a source dataflow to the specified target dataflow - the source has no valid output block. Are you sure the source dataflow contains at least one block?"); | |
OutputBlock.LinkTo(targetDataflow.InputBlock, Dataflow.DefaultLinkOptions); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public interface IDataflow | |
{ | |
Task Completion { get; } | |
string Name { get; } | |
IList<IDataflowBlock> Blocks { get; } | |
void AppendBlock(IDataflowBlock block); | |
void Complete(); | |
void Fault(Exception ex); | |
bool IsAcceptingBlocks { get; } | |
} | |
public interface IDataflow<TInput>: IDataflow | |
{ | |
ITargetBlock<TInput> InputBlock { get; } | |
} | |
public interface IDataflowOut<TOutput>: IDataflow | |
{ | |
ISourceBlock<TOutput> OutputBlock { get; } | |
void LinkTo(IDataflow<TOutput> targetDataflow); | |
} | |
public interface IDataflow<TInput, TOutput> : IDataflow<TInput>, IDataflowOut<TOutput> | |
{ | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment