Skip to content

Instantly share code, notes, and snippets.

@sachinsu
Created February 14, 2020 04:58
Show Gist options
  • Save sachinsu/6fcbc36e6e5cc58c7b5ba9007e276afc to your computer and use it in GitHub Desktop.
Save sachinsu/6fcbc36e6e5cc58c7b5ba9007e276afc to your computer and use it in GitHub Desktop.
Using System.Threading.channels for implementing High performance Producer-Consumer Pattern
using System;
using System.Collections.Generic;
using System.Threading.Channels;
using System.Threading.Tasks;
using System.Diagnostics;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace channels
{
public class EventConsumer
{
private readonly System.Threading.Channels.Channel<string> channel;
private readonly int workerCount;
private readonly IWebService _service;
private readonly ILogger _logger;
public EventConsumer(ILogger<EventConsumer> logger, IWebService service, int maxMessageCount, int workerCount)
{
this.workerCount = workerCount;
this._service = service;
this._logger = logger;
this.channel = Channel.CreateBounded<string>(maxMessageCount);
}
// Using ValueTask: https://devblogs.microsoft.com/dotnet/understanding-the-whys-whats-and-whens-of-valuetask/
public async ValueTask SetupChannel()
{
var tasks = new List<Task>(this.workerCount);
for (var count = 1; count < this.workerCount; count++)
{
tasks.Add(Task.Run(() => this.ProcessChannelMessage(this.channel.Reader)));
}
// writer.Complete();
await this.channel.Reader.Completion;
await Task.WhenAll(tasks);
}
public async ValueTask Send(string request)
{
await this.channel.Writer.WriteAsync(request);
}
private async ValueTask ProcessChannelMessage(ChannelReader<string> reader)
{
//because async methods use a state machine to handle awaits
//it is safe to await in an infinte loop. Thank you C# compiler gods!
while (await reader.WaitToReadAsync())//if this returns false the channel is completed
{
//as a note, if there are multiple readers but only one message, only one reader
//wakes up. This prevents inefficent races.
string messageString;
while (reader.TryRead(out messageString))//yes, yes I know about 'out var messageString'...
{
var RequestTime = DateTime.Now;
this._logger.LogDebug($"The listener just read {messageString} at {RequestTime}!");
await this._service.GetPage(messageString).ConfigureAwait(false);
var ResponseTime = DateTime.Now;
this._logger.LogDebug($"The listener done {messageString} at {ResponseTime}!", DateTime.Now);
}
}
}
}
}
using System;
using System.Threading.Channels;
using System.Threading.Tasks;
using System.Diagnostics;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace channels {
public class EventGenerator {
public struct StringArgs {
internal string Id { get; }
internal StringArgs(string id) {
Id = id;
}
}
public event EventHandler<StringArgs> DataChanged;
public void GenerateLoad(int loadFactor) {
for (int i = 0; i < loadFactor; i++) {
DataChanged?.Invoke(this,new StringArgs(i.ToString()));
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment