Skip to content

Instantly share code, notes, and snippets.

@fgasperij
Created March 13, 2017 22:09
Show Gist options
  • Save fgasperij/cac75c9fb7141c67a4ac35b28010428e to your computer and use it in GitHub Desktop.
Save fgasperij/cac75c9fb7141c67a4ac35b28010428e to your computer and use it in GitHub Desktop.
BufferBroadcastBlock
public class BufferBroadcastBlock<T> : ITargetBlock<T>
{
public class Target
{
public bool propagateCompletion { get; set; }
public ITargetBlock<T> target { get; set; }
public Target(ITargetBlock<T> aTarget, bool completion)
{
target = aTarget;
propagateCompletion = completion;
}
}
public class TargetDisposer : IDisposable
{
public Target target { get; set; }
public List<Target> targets { get; set; }
public TargetDisposer(Target aTarget, List<Target> someTargets)
{
target = aTarget;
targets = someTargets;
}
public void Dispose()
{
targets.Remove(target);
}
}
private ActionBlock<T> action_broadcaster;
private List<Target> targets;
public Func<T, T> cloningFunction { get; set; }
public BufferBroadcastBlock(Func<T, T> cloningFunction) : this(cloningFunction, new ExecutionDataflowBlockOptions()) { }
public BufferBroadcastBlock(Func<T, T> aCloningFunction, ExecutionDataflowBlockOptions dataflowBlockOptions)
{
targets = new List<Target>();
cloningFunction = aCloningFunction;
action_broadcaster = new ActionBlock<T>(async item =>
{
var tasks = new List<Task>();
foreach (var target in targets)
{
tasks.Add(target.target.SendAsync(cloningFunction(item)));
}
await Task.WhenAll(tasks.ToArray());
}, dataflowBlockOptions);
action_broadcaster.Completion.ContinueWith(delegate
{
var tasks = new List<Task>();
foreach (var target in targets)
{
if (target.propagateCompletion)
{
target.target.Complete();
tasks.Add(target.target.Completion);
}
}
Task.WaitAll(tasks.ToArray());
});
}
// Links this dataflow block to the provided target.
public IDisposable LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions)
{
Target t = new Target(target, linkOptions.PropagateCompletion);
targets.Add(t);
return new TargetDisposer(t, targets);
}
#region ITargetBlock<TInput> members
// Asynchronously passes a message to the target block, giving the target the
// opportunity to consume the message.
DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader,
T messageValue, ISourceBlock<T> source, bool consumeToAccept)
{
return ((ITargetBlock<T>)action_broadcaster).OfferMessage(messageHeader,
messageValue, source, consumeToAccept);
}
#endregion
#region IDataflowBlock members
// Gets a Task that represents the completion of this dataflow block.
public Task Completion { get { return action_broadcaster.Completion; } }
// Signals to this target block that it should not accept any more messages,
// nor consume postponed messages.
public void Complete()
{
action_broadcaster.Complete();
}
public void Fault(Exception error)
{
((ITargetBlock<T>)action_broadcaster).Fault(error);
}
#endregion
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment