Skip to content

Instantly share code, notes, and snippets.

@Arithmomaniac
Created August 9, 2023 06:57
Show Gist options
  • Save Arithmomaniac/67fea6f86adee821b534eb28a760c8d5 to your computer and use it in GitHub Desktop.
Save Arithmomaniac/67fea6f86adee821b534eb28a760c8d5 to your computer and use it in GitHub Desktop.
Presentation - Data Flows with Async Streams and Channels
#!meta
{"kernelInfo":{"defaultKernelName":"csharp","items":[{"aliases":[],"languageName":"csharp","name":"csharp"}]}}
#!markdown
# Data Flows with Async Streams and Channels
Avi Levin (@Arithmomaniac)<br>
2023-07-30
#!markdown
- `IEnumerable`
- `async`
- `IAsyncEnumerable`
- Channels
- Producer/Consumer
- Timing - deeper look
- Enhancing LINQ - `ParallelSelectAwait`
- Splitting and joining
#!markdown
**What is a data flow**
- Processing pipeline in which items "flow/stream" through the process
- Declarative, not imperative
- "Mountain Streams" analogy
#!markdown
**IEnumerable**
- Often use as a `for` shorthand, but is not
- Lazy evaluation (including on chaining - like pulling through a straw)
- (JS equivalent - iterators/generators)
#!csharp
IEnumerable<int> Source()
{
for (int i = 0; ; i++)
{
yield return i;
}
}
IEnumerable<int> LowerDown()
{
foreach (var item in Source())
{
if (item > 1)
{
yield break;
}
Console.WriteLine($"Retrieved {item} from Source");
yield return item * 2;
}
}
IEnumerable<int> LowerDown_AsEnumerator()
{
using var enumerator = Source().GetEnumerator();
while (enumerator.MoveNext())
{
var item = enumerator.Current;
if (item > 1)
{
yield break;
}
Console.WriteLine($"Retrieved {item} from Source");
yield return item * 2;
}
}
foreach (var i in LowerDown())
{
Console.WriteLine(i);
}
#!markdown
**How does this work?**
- Auto-generated state machine
#!markdown
```csharp
private bool MoveNext()
{
private int <>1__state;
private int <>2__current;
private int <i>5__2; // captured variable
int IEnumerator<int>.Current => <>2__current;
bool MoveNext()
{
int num = <>1__state;
if (num != 0)
{
if (num != 1) // will never be hit but if could be would break loop
return false;
<>1__state = -1;
<i>5__2++; //i++
}
else
{
<>1__state = -1;
<i>5__2 = 0; //int i = 0
}
<>2__current = <i>5__2; // i is value to return
<>1__state = 1;
return true; // return it
}
}
```
#!markdown
**async**
- I/O operations are inherently asynchronous (does not claim a hold on a CPU thread)
- Getting work "off" the calling thread can unfreeze UI, or support increased scalability
- But how?
![image](https://i.stack.imgur.com/8gA9P.png)
#!csharp
interface Sample
{
void X(Action<int> onX);
void Y(int x, Action<int> onY);
void Z(int x, int y);
Task<int> X();
Task<int> Y(int x);
}
Sample sample = null!;
void Callback()
{
sample.X(onX: x => sample.Y(x, onY: y => sample.Z(x, y)));
}
void Promise()
{
sample.X()
.ContinueWith(task => new{x = task.Result, y = sample.Y(task.Result)})
.ContinueWith(task2 => sample.Z(task2.Result.x, task2.Result.y));
}
async Task Async()
{
var x = await sample.X();
var y = await sample.Y(x);
sample.Z(x, y);
}
#!markdown
- Note that `async` uses the `Task` object, initially created for multithreading but repurposed for async
- Underneath, `async` chains together continuation statements with a state machine
- Can mix-and-match using `Task`s as async or promises
#!csharp
using System.Diagnostics;
using System.Threading;
var sw = Stopwatch.StartNew();
var task = Delay();
Thread.Sleep(400);
Console.WriteLine(sw.ElapsedMilliseconds);
await task;
Console.WriteLine(sw.ElapsedMilliseconds);
async Task Delay()
{
await Task.Delay(800);
Console.WriteLine(sw.ElapsedMilliseconds);
}
#!markdown
Further reading:
- https://tirania.org/blog/archive/2013/Aug-15.html
- https://blog.stephencleary.com/2013/11/there-is-no-thread.html
- https://devblogs.microsoft.com/dotnet/how-async-await-really-works/
#!markdown
**IAsyncEnumerable**
- Putting the two together - enumerable where each item retrieved async
#!csharp
async IAsyncEnumerable<int> DoubleAsync(IAsyncEnumerable<int> source)
{
await foreach (var item in source)
{
Console.WriteLine($"Retrieved {item} from Source");
yield return item * 2;
}
}
async IAsyncEnumerable<int> DoubleAsyncWithEnumerator(IAsyncEnumerable<int> source)
{
var enumerator = source.GetAsyncEnumerator();
while (await enumerator.MoveNextAsync())
{
var item = enumerator.Current;
Console.WriteLine($"Retrieved {item} from source using enumerator");
yield return item * 2;
}
}
// nearly impossible to model with regular tasks
Task DoubleTaskWithEnumerator(IAsyncEnumerable<int> source)
{
var enumerator = source.GetAsyncEnumerator();
return Do();
Task Do()
{
return enumerator.MoveNextAsync().AsTask().ContinueWith(b => {
if (b.Result)
{
var item = enumerator.Current;
Console.WriteLine($"Retrieved {item} from source using enumerator");
return Do();
}
else
{
return Task.CompletedTask;
}
}).Unwrap();
}
}
#!markdown
Introducing our sample space for the rest of the examples
#!csharp
#r "nuget:SuperLinq.Async"
using SuperLinq.Async; // pretty cool helper library, check it out
using System.Text.Json;
using System.IO;
// read items from a single db partition (like often to do in Cosmos)
async IAsyncEnumerable<string> ReadPartition(char prefix)
{
for (int i = 0; i < 10; i++)
{
var value = $"{prefix}{i}";
yield return value;
Console.WriteLine($"{value} read from partition");
await Task.Delay(10);
}
}
// make a remote call to transform your input
async Task<int> DoRemoteCalculation(string value)
{
await Task.Delay(25);
return value.Sum(x => (int)x);
}
// serialize to a very large JSON file in streaming format
async Task<string> Serialize(string tag, IAsyncEnumerable<string> values)
{
using var stream = new MemoryStream();
await JsonSerializer.SerializeAsync(
stream,
new { Tag = tag, Values = values.Do(async _ => await Task.Delay(10)) });
return Encoding.UTF8.GetString(stream.ToArray());
}
#!markdown
Using standard LINQ chaining - lots of "dead" time because lazy execution
#!csharp
using System.Diagnostics;
await NaiveSinglePartitionAndFile();
async Task NaiveSinglePartitionAndFile()
{
var sw = Stopwatch.StartNew();
await Serialize("A", ReadPartition('a'));
Console.WriteLine($"took {sw.ElapsedMilliseconds}ms");
}
#!markdown
**Channels**
- Async-compatible Producer/Consumer collection
- Buffer that can be added to on one thread and consumed on another
- Can tell difference between empty and no new items
- Compare to non-async (thread-holding) `BlockingCollection`
#!csharp
using System.Diagnostics;
using System.Threading.Channels;
await ChannelSinglePartitionAndFile();
// this example mimics EventProcessor/PreAggregator flow in XIDS
async Task ChannelSinglePartitionAndFile()
{
var sw = Stopwatch.StartNew();
var channel = Channel.CreateUnbounded<string>();
var writeTask = Serialize("A", channel.Reader.ReadAllAsync());
await ReadPartition('a').ForEachAwaitAsync(async item => await channel.Writer.WriteAsync(item));
channel.Writer.Complete();
await writeTask;
Console.WriteLine($"took {sw.ElapsedMilliseconds}ms");
}
#!markdown
Other advantages to this pattern:
- Can wait on and create new channels for "checkpointing"
- Allows for the consumer of the channel to be "stateful" without creating a state object to recieve events
#!markdown
- What's with the "async" here
- Easy to understand on read - items need to show up
- But writing when there's a buffer - `Bounded` channel...
- Alternative dropping behaviors
#!csharp
using System.Diagnostics;
using System.Threading.Channels;
var sw = Stopwatch.StartNew();
var channel = Channel.CreateBounded<int>(2);
await Task.WhenAll(WriteChannel(), ReadChannel());
async Task WriteChannel()
{
foreach (var i in Enumerable.Range(0, 10))
{
while (!channel.Writer.TryWrite(i))
{
Console.WriteLine($"{sw.ElapsedMilliseconds}: Cannot write {i} yet");
await Task.Delay(100);
}
Console.WriteLine($"{sw.ElapsedMilliseconds}: Wrote {i}");
if (i == 5)
{
await Task.Delay(500);
}
}
channel.Writer.Complete();
}
async Task ReadChannel()
{
await Task.Delay(500);
while (true)
{
if (!channel.Reader.TryRead(out var i))
{
if (channel.Reader.Completion.IsCompletedSuccessfully)
{
Console.WriteLine($"{sw.ElapsedMilliseconds}: all done");
break;
}
else
{
Console.WriteLine($"{sw.ElapsedMilliseconds}: Cannot read yet");
await Task.Delay(100);
}
}
else
{
Console.WriteLine($"{sw.ElapsedMilliseconds}: Read {i}");
}
}
}
#!markdown
**Enhancing standard LINQ chaining**
#!csharp
await ForEachAsync();
await ParallelForEachAsync();
async Task ForEachAsync()
{
var sw = Stopwatch.StartNew();
foreach (var i in Enumerable.Range(0, 50)) { await Task.Delay(5); }
Console.WriteLine(sw.ElapsedMilliseconds);
}
async Task ParallelForEachAsync()
{
var sw = Stopwatch.StartNew();
await Parallel.ForEachAsync(
Enumerable.Range(0, 50),
async (i, _) => { await Task.Delay(5); });
Console.WriteLine(sw.ElapsedMilliseconds);
}
#!csharp
async Task<int> Work(int i)
{
await Task.Delay(5);
return i * 2;
}
await SelectAwait();
async Task SelectAwait()
{
var sw = Stopwatch.StartNew();
var array = await Enumerable.Range(0, 50).ToAsyncEnumerable()
.SelectAwait(async i => await Work(i)) //built-in - Linq.Async
.ToArrayAsync();
Console.WriteLine(string.Join(',', array));
Console.WriteLine(sw.ElapsedMilliseconds);
}
#!markdown
- How to do _this_ in parallel?
- Parallel.ForEachAsync doesn't have a return object
- PLINQ `.AsParallel()` allocates threads and doesn't work with IAsyncEnumerable anyway
- Use `ForEachAsync` to write to a channel, and read the channel as the result
- Used this often in ADP Data Ingestion, e.g. for getting signed URIs for files
- This solution doesn't have ordering; more complex ones exist (e.g. enqueuing `Task`s in a Channel)
#!csharp
using System.Threading;
static async IAsyncEnumerable<TOutput> ParallelSelectAwaitImpl<TInput, TOutput>(
IAsyncEnumerable<TInput> source,
Func<TInput, ValueTask<TOutput>> body)
{
var channel = System.Threading.Channels.Channel.CreateBounded<TOutput>(Environment.ProcessorCount);
using var cts = new CancellationTokenSource();
_ = RunForEachAsync(cts);
// force processing from ParallelForEachAsync to stop after the first time the iterator
// is used (finally is called when the generated iterator is disposed).
// This ensures ParallelForEachAsync halts in case of an invocation of Any(), First(), etc
// where the desired result is achieved before getting to the end of the enumerable.
try
{
// do not pass in the cancellation token; so that error bubbles up from ParallelForEachAsync
// even after ParallelForEachAsync sets the cancellation token
await foreach (var item in channel.Reader.ReadAllAsync(CancellationToken.None))
{
yield return item;
}
}
finally
{
cts.Cancel();
}
// Run ParallelForEachAsync and, when done, notify the writer that work is done (nothing more to yield).
// ParallelForEachAsync assumes responsibility for halting on the cancellation token, which we need to do to
// ensure TryComplete is fired.
async Task RunForEachAsync(CancellationTokenSource cts)
{
try
{
await Parallel.ForEachAsync(
source,
cts.Token,
async (item, token) =>
{
var output = await body(item);
await channel.Writer.WriteAsync(output, token);
});
channel.Writer.TryComplete();
}
catch (Exception ex)
{
channel.Writer.TryComplete(ex);
}
}
}
#!csharp
await ParallelSelectAwait();
async Task ParallelSelectAwait()
{
var sw = Stopwatch.StartNew();
var array = await ParallelSelectAwaitImpl(
Enumerable.Range(0, 50).ToAsyncEnumerable(),
async i => await Work(i)
).ToArrayAsync();
Console.WriteLine(string.Join(',', array));
Console.WriteLine(sw.ElapsedMilliseconds);
}
#!markdown
**Split/Join**
- How to "funnel" two enumerables into a single pipeline?
- Lower-level solutions exist, see e.g. [SuperLinq.ConcurrentMerge](https://viceroypenguin.github.io/SuperLinq/api/SuperLinq.Async.AsyncSuperEnumerable.ConcurrentMerge)
#!csharp
await ChannelTwoPartitionsAndFile();
async Task ChannelTwoPartitionsAndFile()
{
var sw = Stopwatch.StartNew();
// note optimization in channel options - thanks EliA
var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(5) { SingleReader = true });
var writeTask = Serialize("A", channel.Reader.ReadAllAsync());
await Task.WhenAll(
ReadPartition('a').ForEachAwaitAsync(async item => await channel.Writer.WriteAsync(item)),
ReadPartition('b').ForEachAwaitAsync(async item => await channel.Writer.WriteAsync(item))
);
channel.Writer.Complete();
await writeTask;
Console.WriteLine($"took {sw.ElapsedMilliseconds}ms");
}
#!markdown
- Split single enumerable into multiple outputs
- Unlike joining, no lower-level way to do this
#!csharp
await ChannelPartitionAndTwoFiles();
async Task ChannelPartitionAndTwoFiles()
{
var sw = Stopwatch.StartNew();
var channelA = Channel.CreateBounded<string>(new BoundedChannelOptions(5) { SingleWriter = true });
var channelB = Channel.CreateBounded<string>(new BoundedChannelOptions(5) { SingleWriter = true });
var readTaskA = Serialize("A", channelA.Reader.ReadAllAsync());
var readTaskB = Serialize("B", channelB.Reader.ReadAllAsync());
await ReadPartition('a').ForEachAwaitAsync(async (item, i) => {
var channel = (i % 2 == 0 ? channelA : channelB);
await channel.Writer.WriteAsync(item);
});
channelA.Writer.Complete();
channelB.Writer.Complete();
await Task.WhenAll(readTaskA, readTaskB);
Console.WriteLine($"took {sw.ElapsedMilliseconds}ms");
}
#!markdown
Careful when firing and (temporarily) forgetting groups of tasks!
- Cancellation and error propagation - when one fails, stop the other
Recommend [`StructuredConcurrency`](https://github.com/StephenCleary/StructuredConcurrency) library
#!markdown
Further reading on Channels
- https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels/
#!markdown
**Alternatives**
- [TPL Dataflow](https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library)
- Join "Blocks" together
- Extremely powerfull, but overkill for most scenarios
- Has separate LINQ-like operations for a steep learning curve
- [Rx.NET](https://github.com/dotnet/reactive)
- Explicit pub/sub with `IObservable<T>`
- Limited developement and support
#!markdown
**In conclusion**
- Enumerables for just-in-time processing
- Async enables parallelism for free
- Decouple components by linking via Channels
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment