Last active
March 23, 2016 20:51
-
-
Save lukasz-pyrzyk/4d9f8e4fc8f591814ea4 to your computer and use it in GitHub Desktop.
StartSynchronously
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[..] | |
namespace XGain | |
{ | |
public class XGainServer : IServer | |
{ | |
public event EventHandler<StartArgs> OnStart; | |
public event EventHandler<MessageArgs> OnNewMessage; | |
public event EventHandler<ErrorArgs> OnError; | |
private readonly Func<IProcessor<MessageArgs>> _requestProcessorResolver; | |
private readonly TcpListener _listener; | |
private int _workers = 0; | |
public XGainServer(IPAddress ipAddress, int port, Func<IProcessor<MessageArgs>> requestProcessorResolver) | |
{ | |
_requestProcessorResolver = requestProcessorResolver; | |
_listener = new TcpListener(ipAddress, port); | |
} | |
public async Task StartSynchronously(CancellationToken token) | |
{ | |
_listener.Start(); | |
RaiseOnStartEvent(ProcessingType.Synchronously); | |
_workers = 1; | |
while (true) | |
{ | |
token.ThrowIfCancellationRequested(); | |
try | |
{ | |
Socket socket = await _listener.AcceptSocketAsync(); | |
ISocket request = new XGainSocket(socket); | |
ProcessSocketConnection(request); | |
} | |
catch (Exception ex) | |
{ | |
RaiseOnError(ex); | |
} | |
} | |
} | |
public async Task StartParallel(CancellationToken token, int? maxDegreeOfParallelism = null) | |
{ | |
_listener.Start(); | |
RaiseOnStartEvent(ProcessingType.Parallel); | |
int maximumConcurrencyLevel = maxDegreeOfParallelism ?? Environment.ProcessorCount; | |
AutoResetEvent resetEvent = new AutoResetEvent(false); | |
while (true) | |
{ | |
token.ThrowIfCancellationRequested(); | |
try | |
{ | |
if (_workers >= maximumConcurrencyLevel) | |
resetEvent.WaitOne(); | |
Task<Socket> task = _listener.AcceptSocketAsync(); | |
Interlocked.Increment(ref _workers); | |
await task.ContinueWith(socket => | |
{ | |
ISocket request = new XGainSocket(socket.Result); | |
ProcessSocketConnection(request); | |
Interlocked.Decrement(ref _workers); | |
}, token); | |
} | |
catch (Exception ex) | |
{ | |
RaiseOnError(ex); | |
} | |
} | |
} | |
private void ProcessSocketConnection(ISocket socket) | |
{ | |
MessageArgs args = new MessageArgs(); | |
IProcessor<MessageArgs> processor = _requestProcessorResolver(); | |
processor.ProcessSocketConnection(socket, args); | |
RaiseOnNewMessageEvent(socket, args); | |
} | |
[..] | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment