Skip to content

Instantly share code, notes, and snippets.

@AlgorithmsAreCool
Created May 30, 2018 20:02
Show Gist options
  • Save AlgorithmsAreCool/b0960ce8a3400305e43fe8ffdf89b32c to your computer and use it in GitHub Desktop.
Save AlgorithmsAreCool/b0960ce8a3400305e43fe8ffdf89b32c to your computer and use it in GitHub Desktop.
Channels Example
using System;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace ChannelsAreCool
{
//Disclaimer : I didn't actually run this code so it might not quite work.
//Feel free to complain or ask questions and i'll fix it.
public static class Example
{
public static async Task RunExample()
{
const int maxMessagesToBuffer = 100;
var channel = Channel.CreateBounded<string>(maxMessagesToBuffer);
//bounded channels are important if the consumer/reader is slower than the
//producer. You don't want your app to keep buffering until you explode with
//an OutOfMemoryException in production... or use .CreateUnbounded if you don't care
var reader = channel.Reader;
var writer = channel.Writer;
//You typically would need to run a dedicated thread to await and proccess
//message from the channel, but we can use Task.Run to 'borrow' a thread
//from the thread pool for the same purpose. The thread pool will compensate for it
var worker1 = Task.Run(() => ListenToChannel(channel.Reader));
//You can uncomment these to have multiple readers listening to the
//same channel. It works and it is safe.
//var worker2 = Task.Run(() => ListenToChannel(channel.Reader));
//var worker3 = Task.Run(() => ListenToChannel(channel.Reader));
//var worker4 = Task.Run(() => ListenToChannel(channel.Reader));
//This will try to write lots of messages to the channel
//but since the reader is slower (because of the Task.Delay)
//WriteAsync will block until there is space in the channel
//This technique is called Back Pressure and and can help slow
//a writer from getting too far ahead.
//NOTE: Unbounded channels do not block ever
for (int i = 0; i < 1000; i++)
await writer.WriteAsync($"Message # {i}");
//ALWAYS do this to wake up the readers and tell them you are done
//If you don't they will stay awaiting 'WaitToReadAsync()' forever
writer.Complete();
//[Optional]
//This will wait until the readers have read all remaining messages in the
//channel. This is optional bute
await reader.Completion;//this is option but it can be important.
//NOTE: You can also 'await Task.WhenAll(worker1, worker2, worker3, worker4);'
//wait for the workers to have completed processing, not just draining the channel
}
private static async Task ListenToChannel(ChannelReader<string> reader)
{
//because async methods use a state machine to handle awaits
//it is safe to await in an infinte loop. Thank you C# compiler gods!
while (await reader.WaitToReadAsync())//if this returns false the channel is completed
{
//as a note, if there are multiple readers but only one message, only one reader
//wakes up. This prevents inefficent races.
string messageString;
while (reader.TryRead(out messageString))//yes, yes I know about 'out var messageString'...
{
Console.WriteLine($"The listener just read {messageString}!");
await Task.Delay(25);//this simulates some work...
}
}
}
}
}
@dhcgn
Copy link

dhcgn commented Jun 1, 2018

Why is there a TryWrite(T)? In what case does it return false?

@AlgorithmsAreCool
Copy link
Author

@dhcgn,
Sorry for the delay, I didn't see this comment

TryWrite will synchronously attempt to write to the channel. If the channel's internal buffer is full (because the consumer side isn't keeping up) TryWrite will return false. Using try write can help you squeeze even more performance out of the channel since there is less threading over head.

Say for example you got an array of new inputs, you could just loop TryWrite any incur no await overhead until the channel then you can either await WaitToWriteAsync() or manually back off some other way.

@manigandham
Copy link

Here's a link to the overview from the source branch by Stephen Toub:

https://github.com/stephentoub/corefxlab/blob/master/src/System.Threading.Tasks.Channels/README.md

@AlgorithmsAreCool
Copy link
Author

@manigandham, The document you liked is not really accurate anymore.

The API has changed since that document was lasted updated.

@st0le
Copy link

st0le commented Jul 2, 2018

This is a dumb question but why should I use Channels over Dataflow?

@daveyostcom
Copy link

Here's a link to the overview from the source branch by Stephen Toub:
https://github.com/stephentoub/corefxlab/blob/master/src/System.Threading.Tasks.Channels/README.md
Not there. Any idea where it is now?

@AlgorithmsAreCool
Copy link
Author

AlgorithmsAreCool commented Jul 27, 2019

@daveyostcom,
The readme might be gone now i think :/
The source code for channels has been promoted into Corefx here

Also, probably the best current over view of Channels is this blog post :
https://ndportmann.com/system-threading-channels/

@daveyostcom
Copy link

The first dotnetcore that has Channels is 3.0 (currently Preview7). I can't find it on NuGet. Where have people been getting it?

@AlgorithmsAreCool
Copy link
Author

@daveyostcom,

dotnet core 3.0 is avalible here
https://dotnet.microsoft.com/download/dotnet-core/3.0

Channels is available here as a nuget
https://www.nuget.org/packages/System.Threading.Channels

However, I think Channels will continue to be a standalone nuget and not be directly included in core 3.0 (see below for details).

Looking at the core 3.0 API diffs i don't see channels appear in the core API. However, it is mentioned in the standalone packages section in preview 4.

Similarly, Channels shows up in the "platform extensions" section of the API docs for 3.0 and does not show up in the core docs.

@daveyostcom
Copy link

Thanks! I'm using Rider. I was confused because when I used the Rider Nuget interface to search for Channel, the library didn't come up in the results. This is because my project was using another library that has a Channel.

Btw, it's there in the preview5 core 30 API diffs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment