Skip to content

Instantly share code, notes, and snippets.

@mookid8000
Created April 20, 2016 07:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mookid8000/accd90b15dc23f8221c4a3bfcd89cf17 to your computer and use it in GitHub Desktop.
Save mookid8000/accd90b15dc23f8221c4a3bfcd89cf17 to your computer and use it in GitHub Desktop.
Subscriber caching extension for Rebus2
public static class SubscriberCacheExtensions
{
public static void EnableSubscriberCache(this OptionsConfigurer configurer)
{
configurer.Decorate<ISubscriptionStorage>(c =>
{
var subscriptionStorage = c.Get<ISubscriptionStorage>();
return new CachingSubscriptionStorage(subscriptionStorage);
});
}
class CachingSubscriptionStorage : ISubscriptionStorage
{
readonly ISubscriptionStorage _subscriptionStorage;
public CachingSubscriptionStorage(ISubscriptionStorage subscriptionStorage)
{
_subscriptionStorage = subscriptionStorage;
}
public async Task<string[]> GetSubscriberAddresses(string topic)
{
if (MessageContext.Current == null)
{
return await _subscriptionStorage.GetSubscriberAddresses(topic);
}
var cache = GetCache();
string[] subscribers;
if (cache.TryGetValue(topic, out subscribers))
{
return subscribers;
}
subscribers = await _subscriptionStorage.GetSubscriberAddresses(topic);
cache[topic] = subscribers;
return subscribers;
}
public async Task RegisterSubscriber(string topic, string subscriberAddress)
{
if (MessageContext.Current != null)
{
FlushCache(topic);
}
await _subscriptionStorage.RegisterSubscriber(topic, subscriberAddress);
}
public async Task UnregisterSubscriber(string topic, string subscriberAddress)
{
if (MessageContext.Current != null)
{
FlushCache(topic);
}
await _subscriptionStorage.UnregisterSubscriber(topic, subscriberAddress);
}
public bool IsCentralized => _subscriptionStorage.IsCentralized;
static ConcurrentDictionary<string, string[]> GetCache()
{
var transactionContext = MessageContext.Current.TransactionContext;
var cache = transactionContext.GetOrAdd("cached-subscribers", () => new ConcurrentDictionary<string, string[]>());
return cache;
}
static void FlushCache(string topic)
{
var cache = GetCache();
string[] temp;
cache.TryRemove(topic, out temp);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment