Created May 7, 2012 03:19
rx c# tcp server
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Net.Sockets;
using System.Data;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using System.Reactive.Linq;
using System.Reactive.Subjects;
namespace RxWeb
class Program
static void Main(string[] args)
var server = Task.Factory.StartNew(() =>
//create a netactor on port 8081
NetActor a = new NetActor(8081);
//write out to the console when you get sent data
a.Incoming.Subscribe(i => Console.Write(i));
//create an observable from the console and bind it to the netactor
var o=Observable.Start<string>(()=>Console.ReadLine());
//publish from the console to the actor
catch (Exception exc)
while (true)
/// <summary>
/// Binds to localhost. Pretty much 100% based on msdn code
/// </summary>
/// <seealso cref=""/>
public class NetActor
//this socket is us
Socket _Listener;
//this socket is them
Socket _Client;
//Incoming messages from the client
private ISubject<string> _Incoming {get; set;}
//allow the ability to subscribe to incoming messages
public IObservable<string> Incoming
return _Incoming.AsObservable();
//outgoing messages
public ISubject<string> Outgoing { get; private set; }
public NetActor(int port)
_Incoming = new Subject<string>();
Outgoing = new Subject<string>();
_Listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
_Listener.Bind(new IPEndPoint(IPAddress.Parse(""), port));
private void BeginAccept(IAsyncResult ar)
var listener = (Socket)ar.AsyncState;
//create a state that contains the ability to listen
var state=new StateObject()
//create the ability to push data out
Outgoing.Subscribe((s) => Send(state.workSocket, s));
//classic microsoft code, gotta love [object] state
state.workSocket.BeginReceive(state.buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(BeginRead), state);
//lets try to override this with Observable.FromAsyncPattern....
// worthless. this func returns just iasyncresult and not a tangible something... anything...
// like in the microsoft example docs. anger face
//var o = Observable.FromAsyncPattern<byte[], int, int, SocketFlags>(
// (byt, offset, size, flags, cb, st) => state.workSocket.BeginReceive(byt, offset, size, flags, cb, st),
// (a) => state.workSocket.EndReceive(a));
private void BeginRead(IAsyncResult ar)
var state = (StateObject)ar.AsyncState;
int bytes = state.workSocket.EndReceive(ar);
if (bytes > 0)
{, 0, bytes));
var content =;
if (content.IndexOf("<EOF>") != -1)
state.workSocket.BeginReceive(state.buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(BeginRead), state);
catch (Exception exc)
private void Send(Socket handler, String data)
// Convert the string data to byte data using ASCII encoding.
byte[] byteData = Encoding.ASCII.GetBytes(data);
// Begin sending the data to the remote device.
handler.BeginSend(byteData, 0, byteData.Length, 0,
new AsyncCallback(SendCallback), handler);
private void SendCallback(IAsyncResult ar)
// Retrieve the socket from the state object.
Socket handler = (Socket)ar.AsyncState;
// Complete sending the data to the remote device.
int bytesSent = handler.EndSend(ar);
catch (Exception e)
// State object for reading client data asynchronously
private class StateObject
// Client socket.
public Socket workSocket = null;
// Size of receive buffer.
public const int BufferSize = 1024;
// Receive buffer.
public byte[] buffer = new byte[BufferSize];
// Received data string.
public StringBuilder sb = new StringBuilder();
Interesting, I did it using the rx event handling methods. But I found that exceptions and dropped sockets were not handled well. How does this do with exceptions and dropped sockets?

