Skip to content

Instantly share code, notes, and snippets.

@nayato
Created March 1, 2015 07:36
Show Gist options
  • Save nayato/d7d9b82d9adbc8272f3c to your computer and use it in GitHub Desktop.
Save nayato/d7d9b82d9adbc8272f3c to your computer and use it in GitHub Desktop.
Loss of messages when receiving in Helios client. press any key during run in client program to see current messages pending for receipt from server. Press Enter in server program to see number of processed messages.
namespace Helios.Samples.LossClient
{
using System;
using System.Diagnostics;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Helios.Net.Bootstrap;
using Helios.Topology;
class Program
{
static void Main()
{
const int MessageCount = 1000000;
Console.WriteLine("We're going to write a ton of data to the console. {0} iterations.", MessageCount);
Console.WriteLine("Going!");
long pendingReads = MessageCount;
bool done = false;
var stopwatch = new Stopwatch();
Task.Run(() =>
{
var remote = Node.Loopback(1337);
var client =
new ClientBootstrap().SetTransport(TransportType.Tcp)
.Build().NewConnection(Node.Any(), remote);
client.OnConnection += (address, channel) => client.BeginReceive((data, channel1) =>
{
if (Interlocked.Decrement(ref pendingReads) == 0)
{
Console.WriteLine(pendingReads);
done = true;
}
});
client.Open();
var bytes = Encoding.UTF8.GetBytes("THIS IS OUR TEST PAYLOAD");
stopwatch.Start();
var i = 0;
while (i < MessageCount)
{
client.Send(bytes, 0, bytes.Length, remote);
i++;
}
Console.WriteLine("Done queuing messages... waiting for queue to drain");
while (client.MessagesInSendQueue > 0)
{
Thread.Sleep(10);
}
Console.WriteLine("Took {0} seconds to complete sending", stopwatch.Elapsed.TotalSeconds);
});
Task.Run(() =>
{
while (true)
{
Console.ReadKey();
Console.WriteLine("Pending reads: " + Interlocked.Read(ref pendingReads));
}
});
while (!done)
{
Thread.Sleep(10);
}
Console.WriteLine("Done, press any key to exit");
stopwatch.Stop();
Console.WriteLine("Took {0} seconds to complete", stopwatch.Elapsed.TotalSeconds);
Console.ReadLine();
}
}
}
namespace Helios.Samples.TcpReactorServer
{
using System;
using System.Net;
using System.Threading;
using Helios.Net;
using Helios.Reactor.Bootstrap;
using Helios.Topology;
class Program
{
const int DEFAULT_PORT = 1337;
static int Port;
static int processedRequests;
static void ServerPrint(INode node, string message)
{
//Console.WriteLine("[{0}] {1}:{2}: {3}", DateTime.UtcNow, node.Host, node.Port, message);
}
static void Main(string[] args)
{
Port = args.Length < 1 ? DEFAULT_PORT : Int32.Parse(args[0]);
var ip = IPAddress.Any;
Console.WriteLine("Starting echo server...");
Console.WriteLine("Will begin listening for requests on {0}:{1}", ip, Port);
var bootstrapper =
new ServerBootstrap()
.WorkerThreads(1)
.SetTransport(TransportType.Tcp)
.Build();
var reactor = bootstrapper.NewReactor(NodeBuilder.BuildNode().Host(ip).WithPort(Port));
reactor.OnConnection += (node, channel) =>
{
ServerPrint(node,
string.Format("Accepting connection from... {0}:{1}", node.Host, node.Port));
channel.BeginReceive(ReceiveData);
};
reactor.OnDisconnection += (reason, address) => ServerPrint(address.RemoteHost,
string.Format("Closed connection to... {0}:{1} [Reason:{2}]", address.RemoteHost.Host, address.RemoteHost.Port, reason.Type));
reactor.Start();
while (true)
{
Console.ReadLine();
Console.WriteLine(processedRequests);
}
Console.ReadKey();
}
public static void ReceiveData(NetworkData data, IConnection connection)
{
Interlocked.Increment(ref processedRequests);
ServerPrint(connection.RemoteHost, string.Format("recieved {0} bytes", data.Length));
connection.Send(new NetworkData
{
Buffer = data.Buffer,
Length = data.Length,
RemoteHost = connection.RemoteHost
});
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment