Skip to content

Instantly share code, notes, and snippets.

@aarondandy
Created January 1, 2019 23:16
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aarondandy/a5465ebaf06b5a863dd276a471fa6862 to your computer and use it in GitHub Desktop.
Save aarondandy/a5465ebaf06b5a863dd276a471fa6862 to your computer and use it in GitHub Desktop.
Simple Sample for System.Threading.Channels
// Install System.Threading.Channels
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace ConsoleApp29
{
class Program
{
static async Task Main(string[] args)
{
var logChannel = Channel.CreateUnbounded<string>();
var dataChannel = Channel.CreateUnbounded<string[]>();
var logTask = writeLogMessages();
var stopwatch = Stopwatch.StartNew();
await Task.WhenAll(
writeBatches(), writeBatches(), writeBatches(), writeBatches(),
readFile());
stopwatch.Stop();
logChannel.Writer.Complete();
await logTask;
Console.WriteLine($"All done: {stopwatch.Elapsed}");
async Task readFile()
{
const int batchSize = 4096 * 4;
var buffer = new List<string>(batchSize);
try
{
using (var textStream = new StreamReader(@"X:\hugefile.txt", Encoding.UTF8))
{
string line;
while ((line = await textStream.ReadLineAsync().ConfigureAwait(false)) != null)
{
buffer.Add(line);
if (buffer.Count >= batchSize)
{
await submitBatch();
}
}
if (buffer.Count != 0)
{
await submitBatch();
}
}
}
finally
{
dataChannel.Writer.Complete();
}
async Task submitBatch()
{
var batch = buffer.ToArray();
buffer.Clear();
await dataChannel.Writer.WriteAsync(batch);
await logChannel.Writer.WriteAsync($"Read in batch of {batch.Length} lines.");
}
}
async Task writeBatches()
{
// pretend you need some kind of competing consumer setup for <reason>
var fileName = $"{Guid.NewGuid().ToString("d")}.txt";
using (var textStream = File.CreateText(fileName))
{
await logChannel.Writer.WriteAsync($"Prepared file: {fileName}");
while (await dataChannel.Reader.WaitToReadAsync().ConfigureAwait(false))
{
while (dataChannel.Reader.TryRead(out var batch))
{
foreach (var line in batch)
{
await textStream.WriteLineAsync(line);
}
await Task.Delay(200); // just pretend writes are slower
await logChannel.Writer.WriteAsync($"Written batch of {batch.Length} lines.");
}
}
}
}
async Task writeLogMessages()
{
// you can't have multiple threads do a Console.WriteLine reliably
while (await logChannel.Reader.WaitToReadAsync().ConfigureAwait(false))
{
while (logChannel.Reader.TryRead(out var message))
{
Console.WriteLine(message);
}
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment