Skip to content

Instantly share code, notes, and snippets.

@tugberkugurlu
Forked from benfoster/gist:4416655
Created December 31, 2012 01:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tugberkugurlu/4416777 to your computer and use it in GitHub Desktop.
Save tugberkugurlu/4416777 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace TDFDemo
{
class Program
{
static void Main(string[] args)
{
new Program().Run();
}
private async void Run()
{
var bus = new Bus();
// Subscribe to Message type
var subscription1 = bus.Subscribe<Message>(async m => {
await Task.Delay(2000);
Console.WriteLine("{0} Handler 1: {1}.", m.TimeStamp, m.Content);
});
var subscription2 = bus.Subscribe<Message>(m =>
Console.WriteLine("{0} Handler 2: {1}.", m.TimeStamp, m.Content));
// Send messages
for (int i = 0; i < 10; i++)
{
await bus.SendAsync(new Message("Message " + i));
}
// Unsubscribe handler
bus.Unsubscribe(subscription1);
// Send a final message
await bus.SendAsync(new Message("Final Message"));
Console.WriteLine("Finished processing.");
Console.ReadLine();
}
}
public class Message
{
public DateTime TimeStamp { get; private set; }
public string Content { get; private set; }
public Message(string content)
{
Content = content;
TimeStamp = DateTime.UtcNow;
}
}
public class Bus
{
private readonly BroadcastBlock<object> broadcast =
new BroadcastBlock<object>(message => message);
private readonly ConcurrentDictionary<Guid, IDisposable> subscriptions
= new ConcurrentDictionary<Guid, IDisposable>();
public Task SendAsync<TMessage>(TMessage message)
{
return SendAsync<TMessage>(message, CancellationToken.None);
}
public Task SendAsync<TMessage>(TMessage message, CancellationToken cancellationToken)
{
return broadcast.SendAsync(message, cancellationToken);
}
public Guid Subscribe<TMessage>(Action<TMessage> handlerAction)
{
var handler = new ActionBlock<object>(message => handlerAction((TMessage)message));
var subscription = broadcast.LinkTo(handler,
new DataflowLinkOptions { PropagateCompletion = true },
message => message is TMessage);
return AddSubscription(subscription);
}
public void Unsubscribe(Guid subscriptionId)
{
IDisposable subscription;
if (subscriptions.TryRemove(subscriptionId, out subscription))
{
subscription.Dispose();
}
}
private Guid AddSubscription(IDisposable subscription)
{
var subscriptionId = Guid.NewGuid();
subscriptions.TryAdd(subscriptionId, subscription);
return subscriptionId;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment