Created
January 1, 2019 23:16
-
-
Save aarondandy/a5465ebaf06b5a863dd276a471fa6862 to your computer and use it in GitHub Desktop.
Simple Sample for System.Threading.Channels
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Install System.Threading.Channels | |
using System; | |
using System.Collections.Generic; | |
using System.Diagnostics; | |
using System.IO; | |
using System.Text; | |
using System.Threading.Channels; | |
using System.Threading.Tasks; | |
namespace ConsoleApp29 | |
{ | |
class Program | |
{ | |
static async Task Main(string[] args) | |
{ | |
var logChannel = Channel.CreateUnbounded<string>(); | |
var dataChannel = Channel.CreateUnbounded<string[]>(); | |
var logTask = writeLogMessages(); | |
var stopwatch = Stopwatch.StartNew(); | |
await Task.WhenAll( | |
writeBatches(), writeBatches(), writeBatches(), writeBatches(), | |
readFile()); | |
stopwatch.Stop(); | |
logChannel.Writer.Complete(); | |
await logTask; | |
Console.WriteLine($"All done: {stopwatch.Elapsed}"); | |
async Task readFile() | |
{ | |
const int batchSize = 4096 * 4; | |
var buffer = new List<string>(batchSize); | |
try | |
{ | |
using (var textStream = new StreamReader(@"X:\hugefile.txt", Encoding.UTF8)) | |
{ | |
string line; | |
while ((line = await textStream.ReadLineAsync().ConfigureAwait(false)) != null) | |
{ | |
buffer.Add(line); | |
if (buffer.Count >= batchSize) | |
{ | |
await submitBatch(); | |
} | |
} | |
if (buffer.Count != 0) | |
{ | |
await submitBatch(); | |
} | |
} | |
} | |
finally | |
{ | |
dataChannel.Writer.Complete(); | |
} | |
async Task submitBatch() | |
{ | |
var batch = buffer.ToArray(); | |
buffer.Clear(); | |
await dataChannel.Writer.WriteAsync(batch); | |
await logChannel.Writer.WriteAsync($"Read in batch of {batch.Length} lines."); | |
} | |
} | |
async Task writeBatches() | |
{ | |
// pretend you need some kind of competing consumer setup for <reason> | |
var fileName = $"{Guid.NewGuid().ToString("d")}.txt"; | |
using (var textStream = File.CreateText(fileName)) | |
{ | |
await logChannel.Writer.WriteAsync($"Prepared file: {fileName}"); | |
while (await dataChannel.Reader.WaitToReadAsync().ConfigureAwait(false)) | |
{ | |
while (dataChannel.Reader.TryRead(out var batch)) | |
{ | |
foreach (var line in batch) | |
{ | |
await textStream.WriteLineAsync(line); | |
} | |
await Task.Delay(200); // just pretend writes are slower | |
await logChannel.Writer.WriteAsync($"Written batch of {batch.Length} lines."); | |
} | |
} | |
} | |
} | |
async Task writeLogMessages() | |
{ | |
// you can't have multiple threads do a Console.WriteLine reliably | |
while (await logChannel.Reader.WaitToReadAsync().ConfigureAwait(false)) | |
{ | |
while (logChannel.Reader.TryRead(out var message)) | |
{ | |
Console.WriteLine(message); | |
} | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment