Skip to content

Instantly share code, notes, and snippets.

@tgnm
Last active August 4, 2018 22:21
Show Gist options
  • Save tgnm/bdc7c94499a6b9b7ef46b335f386ea6f to your computer and use it in GitHub Desktop.
Save tgnm/bdc7c94499a6b9b7ef46b335f386ea6f to your computer and use it in GitHub Desktop.
System.Threading.Channels - MessageBus implementation
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace LWT
{
public class MessageBus
{
private class Subscribers<TTopic>
{
private readonly List<Action<TTopic>> _subscribers = new List<Action<TTopic>>();
public void Broadcast(TTopic message)
{
_subscribers.ForEach(subscriber => subscriber(message));
}
public void Subscribe(Action<TTopic> subscriber)
{
_subscribers.Add(subscriber);
}
}
public class Publisher<TTopic>
{
private readonly ChannelWriter<TTopic> _channel;
internal Publisher(ChannelWriter<TTopic> channel)
{
_channel = channel;
}
public async Task Publish(TTopic message)
{
await _channel.WriteAsync(message);
}
}
private readonly Dictionary<Type, object> _channels = new Dictionary<Type, object>();
private readonly Dictionary<Type, object> _subscribers = new Dictionary<Type, object>();
public Publisher<TTopic> CreatePublisher<TTopic>()
{
var channel = CreateTopicIfNecessary<TTopic>(typeof(TTopic));
return new Publisher<TTopic>(channel);
}
public void Subscribe<TTopic>(Action<TTopic> subscriber)
{
var topic = typeof(TTopic);
var topicChannel = CreateTopicIfNecessary<TTopic>(topic);
lock (topicChannel)
{
var subscribers = (Subscribers<TTopic>) _subscribers[topic];
subscribers.Subscribe(subscriber);
}
}
private async Task RunPublishLoop<TTopic>(
ChannelReader<TTopic> topicChannel,
Subscribers<TTopic> subscribers)
{
while (await topicChannel.WaitToReadAsync())
{
while (topicChannel.TryRead(out var message))
{
lock (topicChannel)
{
subscribers.Broadcast(message);
}
}
}
}
private Channel<TTopic> CreateTopicIfNecessary<TTopic>(Type topic)
{
lock (_channels)
{
if (_channels.TryGetValue(topic, out var channel))
{
return (Channel<TTopic>) channel;
}
var topicChannel = Channel.CreateUnbounded<TTopic>();
_channels.TryAdd(typeof(TTopic), topicChannel);
var subscribers = new Subscribers<TTopic>();
_subscribers.Add(topic, subscribers);
Task.Run(() => RunPublishLoop(topicChannel.Reader, subscribers));
return topicChannel;
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment