Skip to content

Instantly share code, notes, and snippets.

@micahasmith
Created May 7, 2012 03:19
Show Gist options
  • Save micahasmith/2625670 to your computer and use it in GitHub Desktop.
Save micahasmith/2625670 to your computer and use it in GitHub Desktop.
rx c# tcp server
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Net.Sockets;
using System.Data;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using System.Reactive.Linq;
using System.Reactive.Subjects;
namespace RxWeb
{
class Program
{
static void Main(string[] args)
{
var server = Task.Factory.StartNew(() =>
{
try
{
//create a netactor on port 8081
NetActor a = new NetActor(8081);
//write out to the console when you get sent data
a.Incoming.Subscribe(i => Console.Write(i));
//create an observable from the console and bind it to the netactor
var o=Observable.Start<string>(()=>Console.ReadLine());
//publish from the console to the actor
o.Subscribe<string>(a.Outgoing.OnNext);
}
catch (Exception exc)
{
throw;
}
});
while (true)
{
;
}
}
}
/// <summary>
/// Binds to localhost. Pretty much 100% based on msdn code
/// </summary>
/// <seealso cref="http://msdn.microsoft.com/en-us/library/fx6588te.aspx"/>
public class NetActor
{
//this socket is us
Socket _Listener;
//this socket is them
Socket _Client;
//Incoming messages from the client
private ISubject<string> _Incoming {get; set;}
//allow the ability to subscribe to incoming messages
public IObservable<string> Incoming
{
get
{
return _Incoming.AsObservable();
}
}
//outgoing messages
public ISubject<string> Outgoing { get; private set; }
public NetActor(int port)
{
_Incoming = new Subject<string>();
Outgoing = new Subject<string>();
_Listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
_Listener.Bind(new IPEndPoint(IPAddress.Parse("127.0.0.1"), port));
_Listener.Listen(20);
_Listener.BeginAccept((o)=>BeginAccept(o),_Listener);
}
private void BeginAccept(IAsyncResult ar)
{
var listener = (Socket)ar.AsyncState;
//create a state that contains the ability to listen
var state=new StateObject()
{
workSocket=_Listener.EndAccept(ar)
};
//create the ability to push data out
Outgoing.Subscribe((s) => Send(state.workSocket, s));
//classic microsoft code, gotta love [object] state
state.workSocket.BeginReceive(state.buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(BeginRead), state);
//lets try to override this with Observable.FromAsyncPattern....
// worthless. this func returns just iasyncresult and not a tangible something... anything...
// like in the microsoft example docs. anger face
//var o = Observable.FromAsyncPattern<byte[], int, int, SocketFlags>(
// (byt, offset, size, flags, cb, st) => state.workSocket.BeginReceive(byt, offset, size, flags, cb, st),
// (a) => state.workSocket.EndReceive(a));
}
private void BeginRead(IAsyncResult ar)
{
try
{
var state = (StateObject)ar.AsyncState;
int bytes = state.workSocket.EndReceive(ar);
if (bytes > 0)
{
state.sb.Append(Encoding.ASCII.GetString(state.buffer, 0, bytes));
var content = state.sb.ToString();
_Incoming.OnNext(content);
if (content.IndexOf("<EOF>") != -1)
{
_Incoming.OnCompleted();
}
}
else
{
state.workSocket.BeginReceive(state.buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(BeginRead), state);
}
}
catch (Exception exc)
{
_Incoming.OnError(exc);
}
}
private void Send(Socket handler, String data)
{
// Convert the string data to byte data using ASCII encoding.
byte[] byteData = Encoding.ASCII.GetBytes(data);
// Begin sending the data to the remote device.
handler.BeginSend(byteData, 0, byteData.Length, 0,
new AsyncCallback(SendCallback), handler);
}
private void SendCallback(IAsyncResult ar)
{
try
{
// Retrieve the socket from the state object.
Socket handler = (Socket)ar.AsyncState;
// Complete sending the data to the remote device.
int bytesSent = handler.EndSend(ar);
handler.Shutdown(SocketShutdown.Both);
handler.Close();
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
}
}
// State object for reading client data asynchronously
private class StateObject
{
// Client socket.
public Socket workSocket = null;
// Size of receive buffer.
public const int BufferSize = 1024;
// Receive buffer.
public byte[] buffer = new byte[BufferSize];
// Received data string.
public StringBuilder sb = new StringBuilder();
}
}
}
@mika76
Copy link

mika76 commented Sep 11, 2016

Interesting, I did it using the rx event handling methods. But I found that exceptions and dropped sockets were not handled well. How does this do with exceptions and dropped sockets?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment