Last active
December 11, 2017 01:50
-
-
Save khenidak/21f8de349a460ab90408 to your computer and use it in GitHub Desktop.
Composite Listener allows you to create multiple ICommunicationListener(s) per Service Fabric Replica (each can run its own communication stack)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
** this code is undergoing major updates (and fixes), please stay tuned. | |
for more details check: http://henidak.com/2015/10/azure-service-fabric-multiple-communication-listeners-per-service/ | |
*/ | |
public enum ICommunicationListenerStatus | |
{ | |
Closed, | |
Opening, | |
Opened, | |
Closing, | |
Initializing, | |
Initialized, | |
Aborting, | |
Aborted | |
} | |
public class CompositeCommunicationListener : ICommunicationListener | |
{ | |
private Dictionary<string, ICommunicationListener> m_listeners = new Dictionary<string, ICommunicationListener>(); | |
private Dictionary<string, ICommunicationListenerStatus> m_Statuses = new Dictionary<string, ICommunicationListenerStatus>(); | |
private AutoResetEvent m_listnerLock = new AutoResetEvent(true); | |
private ICommunicationListenerStatus m_ListenerStatus = ICommunicationListenerStatus.Closed; | |
private ServiceInitializationParameters m_ServiceInitializationParameters; | |
private void EnsureFuncs() | |
{ | |
if (null == OnCreateListeningAddress) | |
{ | |
OnCreateListeningAddress = (listener, addresses) => | |
{ | |
StringBuilder sb = new StringBuilder(); | |
foreach (var address in addresses) | |
sb.Append(string.Concat(address, ";")); | |
return sb.ToString(); | |
}; | |
} | |
} | |
private void ValidateListeners() | |
{ | |
/* | |
services that starts with 0 listners and dynamically add them | |
will have a problem with this | |
if (0 == m_listeners.Count) | |
throw new InvalidOperationException("can not work with zero listeners"); | |
*/ | |
if ((m_listeners.Where(kvp => null == kvp.Value).Count()) > 0) | |
throw new InvalidOperationException("can not have null listeners"); | |
} | |
public Func<CompositeCommunicationListener, List<string>, string> OnCreateListeningAddress | |
{ | |
get; | |
set; | |
} | |
public KeyValuePair<string, ICommunicationListener>[] Listners | |
{ | |
get { return m_listeners.ToArray(); } | |
} | |
public ICommunicationListenerStatus GetListenerStatus(string ListenerName) | |
{ | |
if (!m_Statuses.ContainsKey(ListenerName)) | |
throw new InvalidOperationException(string.Format("Listener with the name {0} does not exist", ListenerName)); | |
return m_Statuses[ListenerName]; | |
} | |
public CompositeCommunicationListener(): this(null) | |
{ | |
} | |
public CompositeCommunicationListener(Dictionary<string, ICommunicationListener> listeners) | |
{ | |
if (null != listeners) | |
foreach (var kvp in listeners) | |
{ | |
m_listeners.Add(kvp.Key, kvp.Value); | |
m_Statuses.Add(kvp.Key, ICommunicationListenerStatus.Closed); | |
} | |
} | |
public async Task AddListenerAsync(string Name, ICommunicationListener listener) | |
{ | |
try | |
{ | |
if (null == listener) | |
throw new ArgumentNullException("listener"); | |
if (m_listeners.ContainsKey(Name)) | |
throw new InvalidOperationException(string.Format("Listener with the name {0} already exists", Name)); | |
m_listnerLock.WaitOne(); | |
m_listeners.Add(Name, listener); | |
m_Statuses.Add(Name, ICommunicationListenerStatus.Closed); | |
_InitListner(Name, listener); | |
await _OpenListener(Name, listener, CancellationToken.None); | |
} | |
catch | |
{ | |
throw; | |
} | |
finally | |
{ | |
m_listnerLock.Set(); | |
} | |
} | |
public async Task RemoveListenerAsync(string Name) | |
{ | |
try | |
{ | |
if (m_listeners.ContainsKey(Name)) | |
throw new InvalidOperationException(string.Format("Listener with the name {0} does not exists", Name)); | |
m_listnerLock.WaitOne(); | |
var listener = m_listeners[Name]; | |
await _CloseListener(Name, listener, CancellationToken.None); | |
m_listeners.Remove(Name); | |
m_Statuses.Remove(Name); | |
} | |
catch | |
{ | |
throw; | |
} | |
finally | |
{ | |
m_listnerLock.Set(); | |
} | |
} | |
public void Abort() | |
{ | |
try | |
{ | |
m_listnerLock.WaitOne(); | |
m_ListenerStatus = ICommunicationListenerStatus.Aborting; | |
foreach (var kvp in m_listeners) | |
_AbortListener(kvp.Key, kvp.Value); | |
m_ListenerStatus = ICommunicationListenerStatus.Aborted; | |
} | |
catch | |
{ | |
throw; | |
} | |
finally | |
{ | |
m_listnerLock.Set(); | |
} | |
} | |
public async Task CloseAsync(CancellationToken cancellationToken) | |
{ | |
try | |
{ | |
m_ListenerStatus = ICommunicationListenerStatus.Closing; | |
m_listnerLock.WaitOne(); | |
var tasks = new List<Task>(); | |
foreach (var kvp in m_listeners) | |
tasks.Add(_CloseListener(kvp.Key, kvp.Value, cancellationToken)); | |
await Task.WhenAll(tasks); | |
m_ListenerStatus = ICommunicationListenerStatus.Closed; | |
} | |
catch | |
{ | |
throw; | |
} | |
finally | |
{ | |
m_listnerLock.Set(); | |
} | |
} | |
public void Initialize(ServiceInitializationParameters serviceInitializationParameters) | |
{ | |
try | |
{ | |
m_listnerLock.WaitOne(); | |
m_ServiceInitializationParameters = serviceInitializationParameters; | |
foreach (var kvp in m_listeners) | |
_InitListner(kvp.Key, kvp.Value); | |
} | |
catch | |
{ | |
throw; | |
} | |
finally | |
{ | |
m_listnerLock.Set(); | |
} | |
} | |
public async Task<string> OpenAsync(CancellationToken cancellationToken) | |
{ | |
try | |
{ | |
ValidateListeners(); | |
m_listnerLock.WaitOne(); | |
var tasks = new List<Task<string>>(); | |
var addresses = new List<string>(); | |
foreach (var kvp in m_listeners) | |
tasks.Add(_OpenListener(kvp.Key, kvp.Value, cancellationToken)); | |
await Task.WhenAll(tasks); | |
foreach (var task in tasks) | |
addresses.Add(task.Result); | |
EnsureFuncs(); | |
return OnCreateListeningAddress(this, addresses); | |
} | |
catch | |
{ | |
throw; | |
} | |
finally | |
{ | |
m_listnerLock.Set(); | |
} | |
} | |
private void _InitListner(string ListenerName, | |
ICommunicationListener listener) | |
{ | |
if (m_Statuses[ListenerName] == ICommunicationListenerStatus.Initializing || | |
m_Statuses[ListenerName] == ICommunicationListenerStatus.Initialized) | |
return ; | |
m_Statuses[ListenerName] = ICommunicationListenerStatus.Initializing; | |
listener.Initialize(m_ServiceInitializationParameters); | |
m_Statuses[ListenerName] = ICommunicationListenerStatus.Initialized; | |
} | |
private async Task<string> _OpenListener(string ListenerName, | |
ICommunicationListener listener, | |
CancellationToken canceltoken) | |
{ | |
if (m_Statuses[ListenerName] == ICommunicationListenerStatus.Opening || | |
m_Statuses[ListenerName] == ICommunicationListenerStatus.Opened) | |
return ""; | |
m_Statuses[ListenerName] = ICommunicationListenerStatus.Opening; | |
var sAddress = await listener.OpenAsync(canceltoken); | |
m_Statuses[ListenerName] = ICommunicationListenerStatus.Opened; | |
return sAddress; | |
} | |
private async Task _CloseListener(string ListenerName, | |
ICommunicationListener listener, | |
CancellationToken cancelToken) | |
{ | |
if (m_Statuses[ListenerName] == ICommunicationListenerStatus.Closing|| | |
m_Statuses[ListenerName] == ICommunicationListenerStatus.Closed) | |
return; | |
m_Statuses[ListenerName] = ICommunicationListenerStatus.Closing; | |
await listener.CloseAsync(cancelToken); | |
m_Statuses[ListenerName] = ICommunicationListenerStatus.Closed; | |
} | |
private void _AbortListener(string ListenerName, | |
ICommunicationListener listener) | |
{ | |
if (m_Statuses[ListenerName] == ICommunicationListenerStatus.Aborted || | |
m_Statuses[ListenerName] == ICommunicationListenerStatus.Aborted) | |
return; | |
m_Statuses[ListenerName] = ICommunicationListenerStatus.Aborting; | |
listener.Abort(); | |
m_Statuses[ListenerName] = ICommunicationListenerStatus.Aborted; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment