Created
March 13, 2017 22:09
-
-
Save fgasperij/cac75c9fb7141c67a4ac35b28010428e to your computer and use it in GitHub Desktop.
BufferBroadcastBlock
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class BufferBroadcastBlock<T> : ITargetBlock<T> | |
{ | |
public class Target | |
{ | |
public bool propagateCompletion { get; set; } | |
public ITargetBlock<T> target { get; set; } | |
public Target(ITargetBlock<T> aTarget, bool completion) | |
{ | |
target = aTarget; | |
propagateCompletion = completion; | |
} | |
} | |
public class TargetDisposer : IDisposable | |
{ | |
public Target target { get; set; } | |
public List<Target> targets { get; set; } | |
public TargetDisposer(Target aTarget, List<Target> someTargets) | |
{ | |
target = aTarget; | |
targets = someTargets; | |
} | |
public void Dispose() | |
{ | |
targets.Remove(target); | |
} | |
} | |
private ActionBlock<T> action_broadcaster; | |
private List<Target> targets; | |
public Func<T, T> cloningFunction { get; set; } | |
public BufferBroadcastBlock(Func<T, T> cloningFunction) : this(cloningFunction, new ExecutionDataflowBlockOptions()) { } | |
public BufferBroadcastBlock(Func<T, T> aCloningFunction, ExecutionDataflowBlockOptions dataflowBlockOptions) | |
{ | |
targets = new List<Target>(); | |
cloningFunction = aCloningFunction; | |
action_broadcaster = new ActionBlock<T>(async item => | |
{ | |
var tasks = new List<Task>(); | |
foreach (var target in targets) | |
{ | |
tasks.Add(target.target.SendAsync(cloningFunction(item))); | |
} | |
await Task.WhenAll(tasks.ToArray()); | |
}, dataflowBlockOptions); | |
action_broadcaster.Completion.ContinueWith(delegate | |
{ | |
var tasks = new List<Task>(); | |
foreach (var target in targets) | |
{ | |
if (target.propagateCompletion) | |
{ | |
target.target.Complete(); | |
tasks.Add(target.target.Completion); | |
} | |
} | |
Task.WaitAll(tasks.ToArray()); | |
}); | |
} | |
// Links this dataflow block to the provided target. | |
public IDisposable LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions) | |
{ | |
Target t = new Target(target, linkOptions.PropagateCompletion); | |
targets.Add(t); | |
return new TargetDisposer(t, targets); | |
} | |
#region ITargetBlock<TInput> members | |
// Asynchronously passes a message to the target block, giving the target the | |
// opportunity to consume the message. | |
DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader, | |
T messageValue, ISourceBlock<T> source, bool consumeToAccept) | |
{ | |
return ((ITargetBlock<T>)action_broadcaster).OfferMessage(messageHeader, | |
messageValue, source, consumeToAccept); | |
} | |
#endregion | |
#region IDataflowBlock members | |
// Gets a Task that represents the completion of this dataflow block. | |
public Task Completion { get { return action_broadcaster.Completion; } } | |
// Signals to this target block that it should not accept any more messages, | |
// nor consume postponed messages. | |
public void Complete() | |
{ | |
action_broadcaster.Complete(); | |
} | |
public void Fault(Exception error) | |
{ | |
((ITargetBlock<T>)action_broadcaster).Fault(error); | |
} | |
#endregion | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment