Skip to content

Instantly share code, notes, and snippets.

@bennage
Forked from micahasmith/RxTcpServer.cs
Created August 30, 2014 04:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bennage/c60ed468e99234d493b3 to your computer and use it in GitHub Desktop.
Save bennage/c60ed468e99234d493b3 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Net.Sockets;
using System.Data;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using System.Reactive.Linq;
using System.Reactive.Subjects;
namespace RxWeb
{
class Program
{
static void Main(string[] args)
{
var server = Task.Factory.StartNew(() =>
{
try
{
//create a netactor on port 8081
NetActor a = new NetActor(8081);
//write out to the console when you get sent data
a.Incoming.Subscribe(i => Console.Write(i));
//create an observable from the console and bind it to the netactor
var o=Observable.Start<string>(()=>Console.ReadLine());
//publish from the console to the actor
o.Subscribe<string>(a.Outgoing.OnNext);
}
catch (Exception exc)
{
throw;
}
});
while (true)
{
;
}
}
}
/// <summary>
/// Binds to localhost. Pretty much 100% based on msdn code
/// </summary>
/// <seealso cref="http://msdn.microsoft.com/en-us/library/fx6588te.aspx"/>
public class NetActor
{
//this socket is us
Socket _Listener;
//this socket is them
Socket _Client;
//Incoming messages from the client
private ISubject<string> _Incoming {get; set;}
//allow the ability to subscribe to incoming messages
public IObservable<string> Incoming
{
get
{
return _Incoming.AsObservable();
}
}
//outgoing messages
public ISubject<string> Outgoing { get; private set; }
public NetActor(int port)
{
_Incoming = new Subject<string>();
Outgoing = new Subject<string>();
_Listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
_Listener.Bind(new IPEndPoint(IPAddress.Parse("127.0.0.1"), port));
_Listener.Listen(20);
_Listener.BeginAccept((o)=>BeginAccept(o),_Listener);
}
private void BeginAccept(IAsyncResult ar)
{
var listener = (Socket)ar.AsyncState;
//create a state that contains the ability to listen
var state=new StateObject()
{
workSocket=_Listener.EndAccept(ar)
};
//create the ability to push data out
Outgoing.Subscribe((s) => Send(state.workSocket, s));
//classic microsoft code, gotta love [object] state
state.workSocket.BeginReceive(state.buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(BeginRead), state);
//lets try to override this with Observable.FromAsyncPattern....
// worthless. this func returns just iasyncresult and not a tangible something... anything...
// like in the microsoft example docs. anger face
//var o = Observable.FromAsyncPattern<byte[], int, int, SocketFlags>(
// (byt, offset, size, flags, cb, st) => state.workSocket.BeginReceive(byt, offset, size, flags, cb, st),
// (a) => state.workSocket.EndReceive(a));
}
private void BeginRead(IAsyncResult ar)
{
try
{
var state = (StateObject)ar.AsyncState;
int bytes = state.workSocket.EndReceive(ar);
if (bytes > 0)
{
state.sb.Append(Encoding.ASCII.GetString(state.buffer, 0, bytes));
var content = state.sb.ToString();
_Incoming.OnNext(content);
if (content.IndexOf("<EOF>") != -1)
{
_Incoming.OnCompleted();
}
}
else
{
state.workSocket.BeginReceive(state.buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(BeginRead), state);
}
}
catch (Exception exc)
{
_Incoming.OnError(exc);
}
}
private void Send(Socket handler, String data)
{
// Convert the string data to byte data using ASCII encoding.
byte[] byteData = Encoding.ASCII.GetBytes(data);
// Begin sending the data to the remote device.
handler.BeginSend(byteData, 0, byteData.Length, 0,
new AsyncCallback(SendCallback), handler);
}
private void SendCallback(IAsyncResult ar)
{
try
{
// Retrieve the socket from the state object.
Socket handler = (Socket)ar.AsyncState;
// Complete sending the data to the remote device.
int bytesSent = handler.EndSend(ar);
handler.Shutdown(SocketShutdown.Both);
handler.Close();
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
}
}
// State object for reading client data asynchronously
private class StateObject
{
// Client socket.
public Socket workSocket = null;
// Size of receive buffer.
public const int BufferSize = 1024;
// Receive buffer.
public byte[] buffer = new byte[BufferSize];
// Received data string.
public StringBuilder sb = new StringBuilder();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment