Skip to content

Instantly share code, notes, and snippets.

@lukasz-pyrzyk
Last active March 23, 2016 20:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lukasz-pyrzyk/4d9f8e4fc8f591814ea4 to your computer and use it in GitHub Desktop.
Save lukasz-pyrzyk/4d9f8e4fc8f591814ea4 to your computer and use it in GitHub Desktop.
StartSynchronously
[..]
namespace XGain
{
public class XGainServer : IServer
{
public event EventHandler<StartArgs> OnStart;
public event EventHandler<MessageArgs> OnNewMessage;
public event EventHandler<ErrorArgs> OnError;
private readonly Func<IProcessor<MessageArgs>> _requestProcessorResolver;
private readonly TcpListener _listener;
private int _workers = 0;
public XGainServer(IPAddress ipAddress, int port, Func<IProcessor<MessageArgs>> requestProcessorResolver)
{
_requestProcessorResolver = requestProcessorResolver;
_listener = new TcpListener(ipAddress, port);
}
public async Task StartSynchronously(CancellationToken token)
{
_listener.Start();
RaiseOnStartEvent(ProcessingType.Synchronously);
_workers = 1;
while (true)
{
token.ThrowIfCancellationRequested();
try
{
Socket socket = await _listener.AcceptSocketAsync();
ISocket request = new XGainSocket(socket);
ProcessSocketConnection(request);
}
catch (Exception ex)
{
RaiseOnError(ex);
}
}
}
public async Task StartParallel(CancellationToken token, int? maxDegreeOfParallelism = null)
{
_listener.Start();
RaiseOnStartEvent(ProcessingType.Parallel);
int maximumConcurrencyLevel = maxDegreeOfParallelism ?? Environment.ProcessorCount;
AutoResetEvent resetEvent = new AutoResetEvent(false);
while (true)
{
token.ThrowIfCancellationRequested();
try
{
if (_workers >= maximumConcurrencyLevel)
resetEvent.WaitOne();
Task<Socket> task = _listener.AcceptSocketAsync();
Interlocked.Increment(ref _workers);
await task.ContinueWith(socket =>
{
ISocket request = new XGainSocket(socket.Result);
ProcessSocketConnection(request);
Interlocked.Decrement(ref _workers);
}, token);
}
catch (Exception ex)
{
RaiseOnError(ex);
}
}
}
private void ProcessSocketConnection(ISocket socket)
{
MessageArgs args = new MessageArgs();
IProcessor<MessageArgs> processor = _requestProcessorResolver();
processor.ProcessSocketConnection(socket, args);
RaiseOnNewMessageEvent(socket, args);
}
[..]
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment