Skip to content

Instantly share code, notes, and snippets.

@AlgorithmsAreCool
Created December 30, 2019 18:11
Show Gist options
  • Save AlgorithmsAreCool/492564e6baec44eea0c95f3e8b4d0941 to your computer and use it in GitHub Desktop.
Save AlgorithmsAreCool/492564e6baec44eea0c95f3e8b4d0941 to your computer and use it in GitHub Desktop.
Hastily written and incomplete dataflow clone using tasks and channels
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace HomebrewDataflow
{
public abstract class WorkflowStageBase<TIn>
{
public WorkflowStageBase(BoundedChannelOptions channelOptions)
{
var channel = Channel.CreateBounded<TIn>(channelOptions);
Inflow = channel.Reader;
Input = channel.Writer;
}
protected ChannelReader<TIn> Inflow { get; }
public ChannelWriter<TIn> Input { get; }
protected abstract Task WorkPump();
public Task Run() => Task.Run(WorkPump);
}
public enum BroadcastPolicy
{
WaitForEach,
SkipBusy
}
public class TransformStage<TIn, TOut> : WorkflowStageBase<TIn>
{
public TransformStage(Func<CancellationToken, TIn, ValueTask<TOut>> workFunction, BroadcastPolicy policy, BoundedChannelOptions options, CancellationToken cancellationToken)
: base(options)
{
WorkFunction = workFunction;
Policy = policy;
Subscribers = new ConcurrentBag<ChannelWriter<TOut>>();
Token = cancellationToken;
}
private Func<CancellationToken, TIn, ValueTask<TOut>> WorkFunction { get; }
private BroadcastPolicy Policy { get; }
private ConcurrentBag<ChannelWriter<TOut>> Subscribers { get; }
private CancellationToken Token { get; }
public void Subscribe(WorkflowStageBase<TOut> stage)
{
Subscribers.Add(stage.Input);
}
protected override async Task WorkPump()
{
try
{
await foreach (var item in Inflow.ReadAllAsync(Token))
{
var result = await WorkFunction(Token, item);
if(Policy == BroadcastPolicy.WaitForEach)
{
foreach(var subscriber in Subscribers)
await subscriber.WriteAsync(result, Token);
}
else //Policy = Skip
{
foreach (var subscriber in Subscribers)
subscriber.TryWrite(result);
}
}
}
catch(Exception ex)
{
foreach (var subscriber in Subscribers)
subscriber.Complete(ex);
}
finally
{
foreach (var subscriber in Subscribers)
subscriber.TryComplete();
}
}
}
public class ActionStage<TIn> : WorkflowStageBase<TIn>
{
public ActionStage(Func<CancellationToken, TIn, ValueTask> workFunction, BoundedChannelOptions options, CancellationToken token)
: base(options)
{
WorkFunction = workFunction;
Token = token;
}
private Func<CancellationToken, TIn, ValueTask> WorkFunction { get; }
private CancellationToken Token { get; }
protected override async Task WorkPump()
{
try
{
await foreach (var item in Inflow.ReadAllAsync(Token))
{
await WorkFunction(Token, item);
}
}
catch
{
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment