Last active
August 4, 2018 22:21
-
-
Save tgnm/bdc7c94499a6b9b7ef46b335f386ea6f to your computer and use it in GitHub Desktop.
System.Threading.Channels - MessageBus implementation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Concurrent; | |
using System.Collections.Generic; | |
using System.Threading.Channels; | |
using System.Threading.Tasks; | |
namespace LWT | |
{ | |
public class MessageBus | |
{ | |
private class Subscribers<TTopic> | |
{ | |
private readonly List<Action<TTopic>> _subscribers = new List<Action<TTopic>>(); | |
public void Broadcast(TTopic message) | |
{ | |
_subscribers.ForEach(subscriber => subscriber(message)); | |
} | |
public void Subscribe(Action<TTopic> subscriber) | |
{ | |
_subscribers.Add(subscriber); | |
} | |
} | |
public class Publisher<TTopic> | |
{ | |
private readonly ChannelWriter<TTopic> _channel; | |
internal Publisher(ChannelWriter<TTopic> channel) | |
{ | |
_channel = channel; | |
} | |
public async Task Publish(TTopic message) | |
{ | |
await _channel.WriteAsync(message); | |
} | |
} | |
private readonly Dictionary<Type, object> _channels = new Dictionary<Type, object>(); | |
private readonly Dictionary<Type, object> _subscribers = new Dictionary<Type, object>(); | |
public Publisher<TTopic> CreatePublisher<TTopic>() | |
{ | |
var channel = CreateTopicIfNecessary<TTopic>(typeof(TTopic)); | |
return new Publisher<TTopic>(channel); | |
} | |
public void Subscribe<TTopic>(Action<TTopic> subscriber) | |
{ | |
var topic = typeof(TTopic); | |
var topicChannel = CreateTopicIfNecessary<TTopic>(topic); | |
lock (topicChannel) | |
{ | |
var subscribers = (Subscribers<TTopic>) _subscribers[topic]; | |
subscribers.Subscribe(subscriber); | |
} | |
} | |
private async Task RunPublishLoop<TTopic>( | |
ChannelReader<TTopic> topicChannel, | |
Subscribers<TTopic> subscribers) | |
{ | |
while (await topicChannel.WaitToReadAsync()) | |
{ | |
while (topicChannel.TryRead(out var message)) | |
{ | |
lock (topicChannel) | |
{ | |
subscribers.Broadcast(message); | |
} | |
} | |
} | |
} | |
private Channel<TTopic> CreateTopicIfNecessary<TTopic>(Type topic) | |
{ | |
lock (_channels) | |
{ | |
if (_channels.TryGetValue(topic, out var channel)) | |
{ | |
return (Channel<TTopic>) channel; | |
} | |
var topicChannel = Channel.CreateUnbounded<TTopic>(); | |
_channels.TryAdd(typeof(TTopic), topicChannel); | |
var subscribers = new Subscribers<TTopic>(); | |
_subscribers.Add(topic, subscribers); | |
Task.Run(() => RunPublishLoop(topicChannel.Reader, subscribers)); | |
return topicChannel; | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment