Skip to content

Instantly share code, notes, and snippets.

@mariodivece
Created March 25, 2014 19:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mariodivece/9769798 to your computer and use it in GitHub Desktop.
Save mariodivece/9769798 to your computer and use it in GitHub Desktop.
A (WIP) generic server using sockets
/// <summary>
/// Author: Mario A. Di Vece - mario@unosquare.com
/// Version 1.0b
/// Generic Server toolkit for quick implementation of socket communications.
/// Unosquare, LLC (c) 2013 - MIT License
/// You are free to use this code. Please give me some credit if you use it.
/// </summary>
namespace Unosquare.Labs.Sockets
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
/// <summary>
/// Represents a listing for the different logging events
/// </summary>
public enum LogEntryCategory
{
ServerWatchdogStarted,
ServerStarted,
ServerStopping,
ServerStopped,
ConnectionAdded,
ConnectionIdled,
ConnectionRemoved,
ConnectionSend,
ConnectionReceive
}
/// <summary>
/// Represents a Socket Connection
/// It's basically a Socket with nicer features and built-in threading
/// </summary>
/// <typeparam name="T"></typeparam>
public class SocketConnection<T>
{
/// <summary>
/// The check interval ms
/// </summary>
private const int CheckIntervalMs = 10;
/// <summary>
/// The message builder
/// </summary>
private StringBuilder MessageBuilder = new StringBuilder();
/// <summary>
/// The remote end point
/// </summary>
protected IPEndPoint m_RemoteEndPoint;
/// <summary>
/// Initializes this instance with the specified socket.
/// </summary>
/// <param name="socket">The socket.</param>
private void Initialize(Socket socket)
{
this.Socket = socket;
StartTime = DateTime.Now;
LastActivity = DateTime.Now;
}
/// <summary>
/// Initializes a new instance of the <see cref="SocketConnection{T}"/> class.
/// </summary>
/// <param name="acceptedSocket">The accepted socket.</param>
public SocketConnection(Socket acceptedSocket)
{
this.Initialize(acceptedSocket);
}
/// <summary>
/// Initializes a new instance of the <see cref="SocketConnection{T}"/> class.
/// </summary>
/// <param name="hostname">The hostname.</param>
/// <param name="port">The port.</param>
public SocketConnection(string hostname, int port)
{
var tcpClient = new TcpClient();
tcpClient.Connect(hostname, port);
this.Initialize(tcpClient.Client);
}
/// <summary>
/// Gets or sets the data received handler.
/// </summary>
/// <value>
/// The data received handler.
/// </value>
public Action<SocketConnection<T>, byte[]> DataReceivedHandler { get; set; }
/// <summary>
/// Gets or sets the message received handler.
/// </summary>
/// <value>
/// The message received handler.
/// </value>
public Action<SocketConnection<T>, string> MessageReceivedHandler { get; set; }
/// <summary>
/// Gets or sets the write log handler.
/// </summary>
/// <value>
/// The write log handler.
/// </value>
public Action<SocketConnection<T>, LogEntryCategory, string> WriteLogHandler { get; set; }
/// <summary>
/// Gets or sets the bytes sent.
/// </summary>
/// <value>
/// The bytes sent.
/// </value>
public long BytesSent { get; protected set; }
/// <summary>
/// Gets or sets the bytes received.
/// </summary>
/// <value>
/// The bytes received.
/// </value>
public long BytesReceived { get; protected set; }
/// <summary>
/// Gets or sets the start time.
/// </summary>
/// <value>
/// The start time.
/// </value>
public DateTime StartTime { get; protected set; }
/// <summary>
/// Gets or sets the last activity.
/// </summary>
/// <value>
/// The last activity.
/// </value>
public DateTime LastActivity { get; protected set; }
/// <summary>
/// Gets or sets the state tag. - Usefult to track protocols
/// </summary>
/// <value>
/// The state tag.
/// </value>
public T StateTag { get; set; }
/// <summary>
/// Gets or sets the socket.
/// </summary>
/// <value>
/// The socket.
/// </value>
public Socket Socket { get; protected set; }
/// <summary>
/// Gets a value indicating whether [is alive].
/// </summary>
/// <value>
/// <c>true</c> if [is alive]; otherwise, <c>false</c>.
/// </value>
public bool IsAlive
{
get
{
if (Socket == null) return false;
if (Socket.Connected == false) return false;
return true;
}
}
/// <summary>
/// Connects this instance.
/// </summary>
public void Connect()
{
var clientThread = new Thread(new ThreadStart(this.Process));
clientThread.IsBackground = true;
clientThread.Name = "Client: " + RemoteEndPoint.ToString();
clientThread.Start();
}
/// <summary>
/// Receives this instance.
/// </summary>
/// <returns></returns>
private byte[] Receive()
{
LastActivity = DateTime.Now;
var buffer = new byte[Socket.Available];
Socket.Receive(buffer);
BytesReceived += buffer.LongLength;
if (WriteLogHandler != null)
this.WriteLogHandler(this, LogEntryCategory.ConnectionSend, this + " < RX: " + buffer.LongLength + " bytes");
return buffer;
}
/// <summary>
/// Processes this instance.
/// </summary>
private void Process()
{
StartTime = DateTime.Now;
LastActivity = DateTime.Now;
MessageBuilder.Clear();
try
{
while (IsAlive)
{
// give it sometime to let the buffer get some data and give the CPU a break
System.Threading.Thread.Sleep(CheckIntervalMs);
if (Socket.Available == 0) continue;
var buffer = Receive();
if (DataReceivedHandler != null)
DataReceivedHandler(this, buffer);
if (MessageReceivedHandler != null)
{
var text = Encoding.UTF8.GetString(buffer);
var parts = text.Split(new string[] { "\r\n" }, StringSplitOptions.None);
for (var i = 0; i < parts.Length - 1; i++)
{
MessageBuilder.Append(parts[i]);
MessageReceivedHandler(this, MessageBuilder.ToString());
MessageBuilder.Clear();
}
MessageBuilder.Append(parts.Last());
}
}
}
catch
{
}
finally
{
Socket.Close();
}
}
/// <summary>
/// Sends the specified buffer.
/// </summary>
/// <param name="buffer">The buffer.</param>
/// <returns></returns>
public bool Send(byte[] buffer)
{
try
{
LastActivity = DateTime.Now;
this.Socket.Send(buffer);
this.BytesSent += buffer.LongLength;
if (WriteLogHandler != null)
this.WriteLogHandler(this, LogEntryCategory.ConnectionSend, this + " > TX: " + buffer.LongLength + " bytes");
return true;
}
catch
{
return false;
}
}
/// <summary>
/// Sends the specified message.
/// </summary>
/// <param name="message">The message.</param>
/// <param name="encoding">The encoding.</param>
/// <returns></returns>
public bool Send(string message, Encoding encoding)
{
var bytes = encoding.GetBytes(message);
return this.Send(bytes);
}
/// <summary>
/// Sends the specified message.
/// </summary>
/// <param name="message">The message.</param>
/// <returns></returns>
public bool Send(string message) { return this.Send(message, Encoding.UTF8); }
/// <summary>
/// Sends the line.
/// </summary>
/// <param name="message">The message.</param>
/// <param name="encoding">The encoding.</param>
/// <returns></returns>
public bool SendLine(string message, Encoding encoding)
{
return this.Send(message + "\r\n", encoding);
}
/// <summary>
/// Sends the line.
/// </summary>
/// <param name="message">The message.</param>
/// <returns></returns>
public bool SendLine(string message) { return this.SendLine(message, Encoding.UTF8); }
/// <summary>
/// Gets the response.
/// </summary>
/// <param name="request">The request.</param>
/// <param name="encoding">The encoding.</param>
/// <returns></returns>
public string GetResponse(string request, Encoding encoding)
{
var response = string.Empty;
var originalMessageReceived = this.MessageReceivedHandler;
var hasReceivedMessgae = false;
try
{
this.MessageReceivedHandler = (s, m) =>
{
hasReceivedMessgae = true;
response = string.Copy(m);
};
// once the handler is set, send the request
if (SendLine(request, encoding) == false) return string.Empty;
while (!hasReceivedMessgae && this.IsAlive)
System.Threading.Thread.Sleep(CheckIntervalMs);
}
catch { }
finally
{
this.MessageReceivedHandler = originalMessageReceived;
}
return response;
}
/// <summary>
/// Gets the response.
/// </summary>
/// <param name="request">The request.</param>
/// <returns></returns>
public string GetResponse(string request) { return this.GetResponse(request, Encoding.UTF8); }
/// <summary>
/// Disconnects this instance.
/// </summary>
public void Disconnect()
{
try
{
Socket.Close(1);
}
catch { }
}
/// <summary>
/// Gets the remote end point.
/// </summary>
/// <value>
/// The remote end point.
/// </value>
public IPEndPoint RemoteEndPoint
{
get
{
if (this.m_RemoteEndPoint == null)
{
var endpoint = Socket.RemoteEndPoint as IPEndPoint;
this.m_RemoteEndPoint = new IPEndPoint(endpoint.Address, endpoint.Port);
}
return this.m_RemoteEndPoint;
}
}
/// <summary>
/// Returns a <see cref="System.String" /> that represents this instance.
/// </summary>
/// <returns>
/// A <see cref="System.String" /> that represents this instance.
/// </returns>
public override string ToString()
{
return RemoteEndPoint.ToString();
}
}
/// <summary>
/// Represents a concurrent bag of client connections
/// </summary>
/// <typeparam name="T"></typeparam>
public class ClientConnectionCollection<T> : ConcurrentBag<SocketConnection<T>> { }
/// <summary>
/// A general-purpose client-handling server based on sockets.
/// The generic argument T is used for tracking the state in the StateTag property of SockketConnection with the same generic type
/// </summary>
/// <typeparam name="T"></typeparam>
public class GenericServer<T>
{
#region Private Fields
private const int CheckIntervalMs = 10;
private Thread ListenerThread;
private Thread WatchdogThread;
private readonly ClientConnectionCollection<T> m_ActiveConnections = new ClientConnectionCollection<T>();
private TcpListener Listener;
#endregion
#region Handlers
public Action<SocketConnection<T>, byte[]> DataReceivedHandler { get; set; }
public Action<SocketConnection<T>, string> MessageReceivedHandler { get; set; }
public Action<SocketConnection<T>> ClientConnectedHandler { get; set; }
public Action<SocketConnection<T>> ClientDisconnectedHandler { get; set; }
public Action<SocketConnection<T>, LogEntryCategory, string> WriteLogHandler { get; set; }
#endregion
#region Properties
public int IdleTimeoutMilliseconds { get; protected set; }
public bool IsRunning { get; protected set; }
public int Port { get; protected set; }
public IEnumerable<SocketConnection<T>> ActiveConnections
{
get
{
return this.m_ActiveConnections.ToArray();
}
}
#endregion
#region Constructors and Initializers
/// <summary>
/// Initializes a new instance of the <see cref="GenericServer{T}"/> class.
/// use idle timout Ms = 0 to never boot clients off after a period of inactivity.
/// </summary>
/// <param name="port">The port.</param>
/// <param name="idleTimeoutMs">The idle timeout ms.</param>
public GenericServer(int port, int idleTimeoutMs)
{
this.IdleTimeoutMilliseconds = idleTimeoutMs;
this.Port = port;
StartWatchdog();
}
/// <summary>
/// Starts the watchdog.
/// </summary>
private void StartWatchdog()
{
this.WatchdogThread = new Thread(() =>
{
OnWriteLog(null, LogEntryCategory.ServerWatchdogStarted, "Watchdog Started.");
while (true)
{
foreach (var client in m_ActiveConnections.ToList())
{
var idleTime = DateTime.Now.Subtract(client.LastActivity).TotalMilliseconds;
var hasTimedOut = idleTime >= IdleTimeoutMilliseconds && IdleTimeoutMilliseconds > 0;
if (client.IsAlive == false || hasTimedOut)
{
if (hasTimedOut)
OnWriteLog(client, LogEntryCategory.ConnectionIdled, "Watchdog detected client " + client + " idled for " + (idleTime / 1000).ToString("0.00") + "s. Disconnecting . . .");
SocketConnection<T> removedClient;
if (this.m_ActiveConnections.TryTake(out removedClient))
{
removedClient.Disconnect(); // make sure...
OnWriteLog(removedClient, LogEntryCategory.ConnectionRemoved, "Connection Removed: " + removedClient.RemoteEndPoint.ToString() + "; Total Clients: " + m_ActiveConnections.Count);
this.OnClientDisconnected(removedClient);
}
else
{
break;
}
}
}
System.Threading.Thread.Sleep(CheckIntervalMs);
}
});
this.WatchdogThread.IsBackground = true;
this.WatchdogThread.Name = "Watchdog";
this.WatchdogThread.Priority = ThreadPriority.BelowNormal;
this.WatchdogThread.Start();
}
/// <summary>
/// Initializes the listener.
/// </summary>
/// <returns></returns>
private Thread InitializeListener()
{
Listener = new TcpListener(new IPEndPoint(IPAddress.Any, this.Port));
Listener.Start();
this.ListenerThread = new Thread(() =>
{
this.IsRunning = true;
while (IsRunning)
{
try
{
var socket = Listener.AcceptSocket();
var connection = new SocketConnection<T>(socket);
connection.DataReceivedHandler = OnClientDataReceived;
connection.MessageReceivedHandler = OnClientMessageReceived;
connection.WriteLogHandler = OnWriteLog;
connection.Connect();
this.m_ActiveConnections.Add(connection);
OnWriteLog(connection, LogEntryCategory.ConnectionAdded, "Connection Added: " + connection + "; Total Clients: " + m_ActiveConnections.Count);
this.OnClientConnected(connection);
}
catch { }
}
});
this.ListenerThread.Name = "Connection Listener";
this.ListenerThread.IsBackground = true;
return this.ListenerThread;
}
#endregion
#region Public Methods
/// <summary>
/// Adds the connection as a server connection.
/// </summary>
/// <param name="hostname">The hostname.</param>
/// <param name="port">The port.</param>
public SocketConnection<T> AddConnection(string hostname, int port)
{
var connection = new SocketConnection<T>(hostname, port);
connection.DataReceivedHandler = OnClientDataReceived;
connection.MessageReceivedHandler = OnClientMessageReceived;
connection.WriteLogHandler = OnWriteLog;
this.m_ActiveConnections.Add(connection);
OnWriteLog(connection, LogEntryCategory.ConnectionAdded, "Connection Added: " + connection + "; Total Clients: " + m_ActiveConnections.Count);
this.OnClientConnected(connection);
connection.Connect();
return connection;
}
/// <summary>
/// Starts this instance.
/// </summary>
public void Start()
{
if (IsRunning) return;
InitializeListener().Start();
OnWriteLog(null, LogEntryCategory.ServerStarted, "Server Started on Port: " + this.Port);
}
/// <summary>
/// Stops this instance.
/// </summary>
public void Stop()
{
if (IsRunning == false) return;
OnWriteLog(null, LogEntryCategory.ServerStopping, "Server Stopping . . .");
this.IsRunning = false;
this.Listener.Stop();
foreach (var client in m_ActiveConnections.ToList())
client.Disconnect();
// wait for all active connections to close
while (m_ActiveConnections.Count > 0)
System.Threading.Thread.Sleep(CheckIntervalMs);
OnWriteLog(null, LogEntryCategory.ServerStopped, "Server Stopped.");
}
/// <summary>
/// Broadcasts the specified buffer.
/// </summary>
/// <param name="buffer">The buffer.</param>
public void Broadcast(byte[] buffer)
{
foreach (var item in this.m_ActiveConnections)
{
item.Send(buffer);
}
}
/// <summary>
/// Broadcasts the specified message.
/// </summary>
/// <param name="message">The message.</param>
/// <param name="encoding">The encoding.</param>
public void Broadcast(string message, Encoding encoding)
{
foreach (var item in this.m_ActiveConnections)
{
item.Send(message, encoding);
}
}
/// <summary>
/// Broadcasts the specified message.
/// </summary>
/// <param name="message">The message.</param>
public void Broadcast(string message)
{
this.Broadcast(message, Encoding.UTF8);
}
#endregion
#region Callback Handlers
/// <summary>
/// Called when [write log].
/// </summary>
/// <param name="connection">The connection.</param>
/// <param name="category">The category.</param>
/// <param name="message">The message.</param>
private void OnWriteLog(SocketConnection<T> connection, LogEntryCategory category, string message)
{
var logMessage = DateTime.Now.ToLongTimeString() + " " + message;
#if DEBUG
System.Diagnostics.Debug.WriteLine(logMessage);
Console.WriteLine(logMessage);
#endif
if (WriteLogHandler == null) return;
WriteLogHandler(connection, category, logMessage);
}
/// <summary>
/// Called when [client data received].
/// </summary>
/// <param name="client">The client.</param>
/// <param name="buffer">The buffer.</param>
private void OnClientDataReceived(SocketConnection<T> client, byte[] buffer)
{
if (this.DataReceivedHandler == null) return;
this.DataReceivedHandler(client, buffer);
}
/// <summary>
/// Called when [client connected].
/// </summary>
/// <param name="client">The client.</param>
private void OnClientConnected(SocketConnection<T> client)
{
if (this.ClientConnectedHandler == null) return;
this.ClientConnectedHandler(client);
}
/// <summary>
/// Called when [client disconnected].
/// </summary>
/// <param name="client">The client.</param>
private void OnClientDisconnected(SocketConnection<T> client)
{
if (this.ClientDisconnectedHandler == null) return;
this.ClientDisconnectedHandler(client);
}
/// <summary>
/// Called when [client message received].
/// </summary>
/// <param name="client">The client.</param>
/// <param name="message">The message.</param>
private void OnClientMessageReceived(SocketConnection<T> client, string message)
{
if (this.MessageReceivedHandler == null) return;
this.MessageReceivedHandler(client, message);
}
#endregion
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment