Skip to content

Instantly share code, notes, and snippets.

@khenidak
Last active December 11, 2017 01:50
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save khenidak/21f8de349a460ab90408 to your computer and use it in GitHub Desktop.
Save khenidak/21f8de349a460ab90408 to your computer and use it in GitHub Desktop.
Composite Listener allows you to create multiple ICommunicationListener(s) per Service Fabric Replica (each can run its own communication stack)
/*
** this code is undergoing major updates (and fixes), please stay tuned.
for more details check: http://henidak.com/2015/10/azure-service-fabric-multiple-communication-listeners-per-service/
*/
public enum ICommunicationListenerStatus
{
Closed,
Opening,
Opened,
Closing,
Initializing,
Initialized,
Aborting,
Aborted
}
public class CompositeCommunicationListener : ICommunicationListener
{
private Dictionary<string, ICommunicationListener> m_listeners = new Dictionary<string, ICommunicationListener>();
private Dictionary<string, ICommunicationListenerStatus> m_Statuses = new Dictionary<string, ICommunicationListenerStatus>();
private AutoResetEvent m_listnerLock = new AutoResetEvent(true);
private ICommunicationListenerStatus m_ListenerStatus = ICommunicationListenerStatus.Closed;
private ServiceInitializationParameters m_ServiceInitializationParameters;
private void EnsureFuncs()
{
if (null == OnCreateListeningAddress)
{
OnCreateListeningAddress = (listener, addresses) =>
{
StringBuilder sb = new StringBuilder();
foreach (var address in addresses)
sb.Append(string.Concat(address, ";"));
return sb.ToString();
};
}
}
private void ValidateListeners()
{
/*
services that starts with 0 listners and dynamically add them
will have a problem with this
if (0 == m_listeners.Count)
throw new InvalidOperationException("can not work with zero listeners");
*/
if ((m_listeners.Where(kvp => null == kvp.Value).Count()) > 0)
throw new InvalidOperationException("can not have null listeners");
}
public Func<CompositeCommunicationListener, List<string>, string> OnCreateListeningAddress
{
get;
set;
}
public KeyValuePair<string, ICommunicationListener>[] Listners
{
get { return m_listeners.ToArray(); }
}
public ICommunicationListenerStatus GetListenerStatus(string ListenerName)
{
if (!m_Statuses.ContainsKey(ListenerName))
throw new InvalidOperationException(string.Format("Listener with the name {0} does not exist", ListenerName));
return m_Statuses[ListenerName];
}
public CompositeCommunicationListener(): this(null)
{
}
public CompositeCommunicationListener(Dictionary<string, ICommunicationListener> listeners)
{
if (null != listeners)
foreach (var kvp in listeners)
{
m_listeners.Add(kvp.Key, kvp.Value);
m_Statuses.Add(kvp.Key, ICommunicationListenerStatus.Closed);
}
}
public async Task AddListenerAsync(string Name, ICommunicationListener listener)
{
try
{
if (null == listener)
throw new ArgumentNullException("listener");
if (m_listeners.ContainsKey(Name))
throw new InvalidOperationException(string.Format("Listener with the name {0} already exists", Name));
m_listnerLock.WaitOne();
m_listeners.Add(Name, listener);
m_Statuses.Add(Name, ICommunicationListenerStatus.Closed);
_InitListner(Name, listener);
await _OpenListener(Name, listener, CancellationToken.None);
}
catch
{
throw;
}
finally
{
m_listnerLock.Set();
}
}
public async Task RemoveListenerAsync(string Name)
{
try
{
if (m_listeners.ContainsKey(Name))
throw new InvalidOperationException(string.Format("Listener with the name {0} does not exists", Name));
m_listnerLock.WaitOne();
var listener = m_listeners[Name];
await _CloseListener(Name, listener, CancellationToken.None);
m_listeners.Remove(Name);
m_Statuses.Remove(Name);
}
catch
{
throw;
}
finally
{
m_listnerLock.Set();
}
}
public void Abort()
{
try
{
m_listnerLock.WaitOne();
m_ListenerStatus = ICommunicationListenerStatus.Aborting;
foreach (var kvp in m_listeners)
_AbortListener(kvp.Key, kvp.Value);
m_ListenerStatus = ICommunicationListenerStatus.Aborted;
}
catch
{
throw;
}
finally
{
m_listnerLock.Set();
}
}
public async Task CloseAsync(CancellationToken cancellationToken)
{
try
{
m_ListenerStatus = ICommunicationListenerStatus.Closing;
m_listnerLock.WaitOne();
var tasks = new List<Task>();
foreach (var kvp in m_listeners)
tasks.Add(_CloseListener(kvp.Key, kvp.Value, cancellationToken));
await Task.WhenAll(tasks);
m_ListenerStatus = ICommunicationListenerStatus.Closed;
}
catch
{
throw;
}
finally
{
m_listnerLock.Set();
}
}
public void Initialize(ServiceInitializationParameters serviceInitializationParameters)
{
try
{
m_listnerLock.WaitOne();
m_ServiceInitializationParameters = serviceInitializationParameters;
foreach (var kvp in m_listeners)
_InitListner(kvp.Key, kvp.Value);
}
catch
{
throw;
}
finally
{
m_listnerLock.Set();
}
}
public async Task<string> OpenAsync(CancellationToken cancellationToken)
{
try
{
ValidateListeners();
m_listnerLock.WaitOne();
var tasks = new List<Task<string>>();
var addresses = new List<string>();
foreach (var kvp in m_listeners)
tasks.Add(_OpenListener(kvp.Key, kvp.Value, cancellationToken));
await Task.WhenAll(tasks);
foreach (var task in tasks)
addresses.Add(task.Result);
EnsureFuncs();
return OnCreateListeningAddress(this, addresses);
}
catch
{
throw;
}
finally
{
m_listnerLock.Set();
}
}
private void _InitListner(string ListenerName,
ICommunicationListener listener)
{
if (m_Statuses[ListenerName] == ICommunicationListenerStatus.Initializing ||
m_Statuses[ListenerName] == ICommunicationListenerStatus.Initialized)
return ;
m_Statuses[ListenerName] = ICommunicationListenerStatus.Initializing;
listener.Initialize(m_ServiceInitializationParameters);
m_Statuses[ListenerName] = ICommunicationListenerStatus.Initialized;
}
private async Task<string> _OpenListener(string ListenerName,
ICommunicationListener listener,
CancellationToken canceltoken)
{
if (m_Statuses[ListenerName] == ICommunicationListenerStatus.Opening ||
m_Statuses[ListenerName] == ICommunicationListenerStatus.Opened)
return "";
m_Statuses[ListenerName] = ICommunicationListenerStatus.Opening;
var sAddress = await listener.OpenAsync(canceltoken);
m_Statuses[ListenerName] = ICommunicationListenerStatus.Opened;
return sAddress;
}
private async Task _CloseListener(string ListenerName,
ICommunicationListener listener,
CancellationToken cancelToken)
{
if (m_Statuses[ListenerName] == ICommunicationListenerStatus.Closing||
m_Statuses[ListenerName] == ICommunicationListenerStatus.Closed)
return;
m_Statuses[ListenerName] = ICommunicationListenerStatus.Closing;
await listener.CloseAsync(cancelToken);
m_Statuses[ListenerName] = ICommunicationListenerStatus.Closed;
}
private void _AbortListener(string ListenerName,
ICommunicationListener listener)
{
if (m_Statuses[ListenerName] == ICommunicationListenerStatus.Aborted ||
m_Statuses[ListenerName] == ICommunicationListenerStatus.Aborted)
return;
m_Statuses[ListenerName] = ICommunicationListenerStatus.Aborting;
listener.Abort();
m_Statuses[ListenerName] = ICommunicationListenerStatus.Aborted;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment