-
-
Save tugberkugurlu/4416777 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Concurrent; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using System.Threading.Tasks.Dataflow; | |
namespace TDFDemo | |
{ | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
new Program().Run(); | |
} | |
private async void Run() | |
{ | |
var bus = new Bus(); | |
// Subscribe to Message type | |
var subscription1 = bus.Subscribe<Message>(async m => { | |
await Task.Delay(2000); | |
Console.WriteLine("{0} Handler 1: {1}.", m.TimeStamp, m.Content); | |
}); | |
var subscription2 = bus.Subscribe<Message>(m => | |
Console.WriteLine("{0} Handler 2: {1}.", m.TimeStamp, m.Content)); | |
// Send messages | |
for (int i = 0; i < 10; i++) | |
{ | |
await bus.SendAsync(new Message("Message " + i)); | |
} | |
// Unsubscribe handler | |
bus.Unsubscribe(subscription1); | |
// Send a final message | |
await bus.SendAsync(new Message("Final Message")); | |
Console.WriteLine("Finished processing."); | |
Console.ReadLine(); | |
} | |
} | |
public class Message | |
{ | |
public DateTime TimeStamp { get; private set; } | |
public string Content { get; private set; } | |
public Message(string content) | |
{ | |
Content = content; | |
TimeStamp = DateTime.UtcNow; | |
} | |
} | |
public class Bus | |
{ | |
private readonly BroadcastBlock<object> broadcast = | |
new BroadcastBlock<object>(message => message); | |
private readonly ConcurrentDictionary<Guid, IDisposable> subscriptions | |
= new ConcurrentDictionary<Guid, IDisposable>(); | |
public Task SendAsync<TMessage>(TMessage message) | |
{ | |
return SendAsync<TMessage>(message, CancellationToken.None); | |
} | |
public Task SendAsync<TMessage>(TMessage message, CancellationToken cancellationToken) | |
{ | |
return broadcast.SendAsync(message, cancellationToken); | |
} | |
public Guid Subscribe<TMessage>(Action<TMessage> handlerAction) | |
{ | |
var handler = new ActionBlock<object>(message => handlerAction((TMessage)message)); | |
var subscription = broadcast.LinkTo(handler, | |
new DataflowLinkOptions { PropagateCompletion = true }, | |
message => message is TMessage); | |
return AddSubscription(subscription); | |
} | |
public void Unsubscribe(Guid subscriptionId) | |
{ | |
IDisposable subscription; | |
if (subscriptions.TryRemove(subscriptionId, out subscription)) | |
{ | |
subscription.Dispose(); | |
} | |
} | |
private Guid AddSubscription(IDisposable subscription) | |
{ | |
var subscriptionId = Guid.NewGuid(); | |
subscriptions.TryAdd(subscriptionId, subscription); | |
return subscriptionId; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment