Skip to content

Instantly share code, notes, and snippets.

@HalidCisse
Forked from AlgorithmsAreCool/ChannelsAreCool.cs
Created June 18, 2019 12:53
Show Gist options
  • Save HalidCisse/7f8e3980034a9df41437816da44f8681 to your computer and use it in GitHub Desktop.
Save HalidCisse/7f8e3980034a9df41437816da44f8681 to your computer and use it in GitHub Desktop.
Channels Example
using System;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace ChannelsAreCool
{
//Disclaimer : I didn't actually run this code so it might not quite work.
//Feel free to complain or ask questions and i'll fix it.
public static class Example
{
public static async Task RunExample()
{
const int maxMessagesToBuffer = 100;
var channel = Channel.CreateBounded<string>(maxMessagesToBuffer);
//bounded channels are important if the consumer/reader is slower than the
//producer. You don't want your app to keep buffering until you explode with
//an OutOfMemoryException in production... or use .CreateUnbounded if you don't care
var reader = channel.Reader;
var writer = channel.Writer;
//You typically would need to run a dedicated thread to await and proccess
//message from the channel, but we can use Task.Run to 'borrow' a thread
//from the thread pool for the same purpose. The thread pool will compensate for it
var worker1 = Task.Run(() => ListenToChannel(channel.Reader));
//You can uncomment these to have multiple readers listening to the
//same channel. It works and it is safe.
//var worker2 = Task.Run(() => ListenToChannel(channel.Reader));
//var worker3 = Task.Run(() => ListenToChannel(channel.Reader));
//var worker4 = Task.Run(() => ListenToChannel(channel.Reader));
//This will try to write lots of messages to the channel
//but since the reader is slower (because of the Task.Delay)
//WriteAsync will block until there is space in the channel
//This technique is called Back Pressure and and can help slow
//a writer from getting too far ahead.
//NOTE: Unbounded channels do not block ever
for (int i = 0; i < 1000; i++)
await writer.WriteAsync($"Message # {i}");
//ALWAYS do this to wake up the readers and tell them you are done
//If you don't they will stay awaiting 'WaitToReadAsync()' forever
writer.Complete();
//[Optional]
//This will wait until the readers have read all remaining messages in the
//channel. This is optional bute
await reader.Completion;//this is option but it can be important.
//NOTE: You can also 'await Task.WhenAll(worker1, worker2, worker3, worker4);'
//wait for the workers to have completed processing, not just draining the channel
}
private static async Task ListenToChannel(ChannelReader<string> reader)
{
//because async methods use a state machine to handle awaits
//it is safe to await in an infinte loop. Thank you C# compiler gods!
while (await reader.WaitToReadAsync())//if this returns false the channel is completed
{
//as a note, if there are multiple readers but only one message, only one reader
//wakes up. This prevents inefficent races.
string messageString;
while (reader.TryRead(out messageString))//yes, yes I know about 'out var messageString'...
{
Console.WriteLine($"The listener just read {messageString}!");
await Task.Delay(25);//this simulates some work...
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment