Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
非同期 TCP 通信。
/* -*- encoding: utf-8; -*- */
using System;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
namespace test.asynctcp.client {
/// <summary>
///
/// </summary>
class AsyncTcpClient {
/// <summary>
///
/// </summary>
public AsyncTcpClient() {
this.local_host_ = Dns.GetHostEntry( Dns.GetHostName() );
//this.local_host_ = Dns.GetHostEntry( "localhost" );
IPAddress local_address = null;
foreach ( IPAddress address in this.local_host_.AddressList ) {
if ( address.AddressFamily == AddressFamily.InterNetwork ) {
local_address = address;
break;
}
}
if ( local_address == null )
throw new ApplicationException( "IPv4 用の IP アドレスが見つかりませんでした。" );
this.local_endpoint_ = new IPEndPoint( local_address, 63980 );
//new Random().Next( 49152, 65535 );
}
/// <summary>
///
/// </summary>
/// <param name="args"></param>
public void run(string[] args) {
TcpClient client = new TcpClient( this.local_endpoint_ );
IPEndPoint listen_endpoint = new IPEndPoint( IPAddress.Parse( "172.16.1.105" ), this.local_endpoint_.Port - 1 );
TimeSpan timeout = TimeSpan.FromMilliseconds( 100 );
try {
/*
* サーバーに接続します。
*/
if ( this.connect( client, listen_endpoint, timeout ) )
Console.WriteLine( "シグナルが補足されました。" );
else
throw new ApplicationException( "サーバーに接続できなかったので終了します。" );
Thread.Sleep( 1000 );
/*
* 通信。
*/
NetworkStream stream = client.GetStream();
while ( true ) {
string greeting = string.Empty;
string response = string.Empty;
Console.Write( "bob > " );
response = this.read( stream, timeout );
Console.WriteLine( response );
if ( response.StartsWith( BYE ) )
break;
Thread.Sleep( 1000 );
Console.Write( "alice < " );
if ( response.EndsWith( "How are you?" ) ) {
Random r = new Random();
int index = r.Next( CONDITIONS.Length );
string condition = CONDITIONS[index];
greeting = string.Format( "{0} Thanks. How are you?", condition );
} else {
// 其れ以外なら、これ以上伝えることは無いので、over. と伝えることにします。
greeting = "over.";
}
Console.WriteLine( greeting );
this.write( stream, greeting, timeout );
}
} catch ( Exception e ) {
Console.WriteLine();
Console.WriteLine( e );
} finally {
client.Close();
}
}
/// <summary>
///
/// </summary>
/// <param name="client"></param>
/// <param name="listen_endpoint"></param>
/// <param name="timeout"></param>
/// <returns></returns>
bool connect(TcpClient client, IPEndPoint listen_endpoint, TimeSpan timeout) {
int trial_count = 0;
ManualResetEvent connected = new ManualResetEvent( false );
IAsyncResult connecting = client.BeginConnect( listen_endpoint.Address,
listen_endpoint.Port,
connectCompleted,
connected );
try {
while ( !connecting.IsCompleted ) {
if ( trial_count >= MAX_TRIALS ) {
Console.WriteLine( "タイムアウトしました。" );
return false;
}
Console.WriteLine( "{0} 回目", trial_count + 1 );
if ( connected.WaitOne( timeout ) )
continue;
if ( connecting.AsyncWaitHandle.WaitOne( 0 ) )
break;
++trial_count;
}
client.EndConnect( connecting );
} catch ( SocketException se ) {
Console.WriteLine( "connect: {0}",se.Message );
}
return client.Client.Connected;
}
/// <summary>
///
/// </summary>
/// <param name="stream"></param>
/// <param name="timeout"></param>
/// <returns></returns>
string read(NetworkStream stream, TimeSpan timeout) {
byte[] temp_buffer = new byte[1024];
int amount_of_bytes_read = 0;
int output_trial_count = 0;
bool signal_captured = false;
ManualResetEvent readed = new ManualResetEvent( false );
IAsyncResult byte_reading = stream.BeginRead( temp_buffer,
0,
temp_buffer.Length,
readCompleted,
readed );
do {
if ( output_trial_count >= MAX_TRIALS )
throw new ApplicationException( "タイムアウトしました。" );
if ( signal_captured = readed.WaitOne( timeout ) )
break;
if ( byte_reading.AsyncWaitHandle.WaitOne( 0 ) )
continue;
++output_trial_count;
} while ( !byte_reading.IsCompleted );
amount_of_bytes_read = stream.EndRead( byte_reading );
return this.transfer_encoding_.GetString( temp_buffer, 0, amount_of_bytes_read );
}
/// <summary>
///
/// </summary>
/// <param name="stream"></param>
/// <param name="text"></param>
/// <param name="timeout"></param>
void write(NetworkStream stream, string text, TimeSpan timeout) {
int input_trial_count = 0;
byte[] sent_bytes = this.transfer_encoding_.GetBytes( text );
bool signal_captured = false;
ManualResetEvent wrote = new ManualResetEvent( false );
IAsyncResult byte_writing = stream.BeginWrite( sent_bytes,
0,
sent_bytes.Length,
writeCompleted,
wrote );
do {
if ( input_trial_count >= MAX_TRIALS ) {
Console.WriteLine( "タイムアウトしました。" );
break;
}
if ( signal_captured = wrote.WaitOne( timeout ) )
break;
if ( byte_writing.AsyncWaitHandle.WaitOne( 0 ) )
continue;
++input_trial_count;
} while ( !byte_writing.IsCompleted );
stream.EndWrite( byte_writing );
}
/// <summary>
///
/// </summary>
/// <param name="ar"></param>
private void connectCompleted(IAsyncResult ar) {
ManualResetEvent signal = ar.AsyncState as ManualResetEvent;
if ( ar.IsCompleted && ar.CompletedSynchronously )
signal.Set();
}
/// <summary>
///
/// </summary>
/// <param name="ar"></param>
void readCompleted(IAsyncResult ar) {
ManualResetEvent signal = ar.AsyncState as ManualResetEvent;
if ( ar.IsCompleted && ar.CompletedSynchronously )
signal.Set();
}
/// <summary>
///
/// </summary>
/// <param name="ar"></param>
void writeCompleted(IAsyncResult ar) {
ManualResetEvent signal = ar.AsyncState as ManualResetEvent;
if ( ar.IsCompleted && ar.CompletedSynchronously )
signal.Set();
}
/// <summary>
///
/// </summary>
/// <param name="args"></param>
static void Main(string[] args) {
AsyncTcpClient progn = new AsyncTcpClient();
progn.run( args );
}
/// <summary>
///
/// </summary>
private Encoding transfer_encoding_ = Encoding.UTF8;
/// <summary>
///
/// </summary>
private IPHostEntry local_host_;
/// <summary>
///
/// </summary>
private IPEndPoint local_endpoint_;
/// <summary>
///
/// </summary>
private static readonly int MAX_TRIALS = 32;
/// <summary>
///
/// </summary>
private static readonly string[] CONDITIONS = new string [] { "Not bad.", "Good!", "Pretty Good!", "Great!" };
/// <summary>
///
/// </summary>
private static readonly string BYE = "bye.";
}
}
/* -*- encoding: utf-8; -*- */
using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
namespace test.asynctcp.server {
/// <summary>
///
/// </summary>
class AsyncTcpServer {
/// <summary>
///
/// </summary>
public AsyncTcpServer() {
this.local_endpoint_ = new IPEndPoint( IPAddress.Any, 63979 );
}
/// <summary>
///
/// </summary>
/// <param name="args"></param>
public void run(string[] args) {
TimeSpan waiting_time = TimeSpan.FromMilliseconds( 100 );
TcpListener listener = null;
//Queue<Thread> thread_queue = new Queue<Thread>();
try {
listener = new TcpListener( this.local_endpoint_ );
Console.WriteLine( "TCP リスナーを開始します。" );
do {
//Thread that = null;
if ( !listener.Server.IsBound )
listener.Start();
if ( this.request_timeouted_.WaitOne( waiting_time ) ) {
Console.WriteLine( "クライアントの受信接続がありませんでした。" );
break;
}
if ( listener.Pending() ) {
Console.WriteLine( "----------------------------------------------------------------------" );
Console.WriteLine( "保留中の接続要求があります。" );
//if ( thread_queue.Count <= 10 ) {
//that = new Thread( acceptedPart );
//that.Start( listener );
//thread_queue.Enqueue( that );
ThreadPool.QueueUserWorkItem( acceptedPart, listener );
//}
Thread.Sleep( waiting_time );
} else
Thread.Sleep( waiting_time );
//if ( thread_queue.Count > 0 ) {
// Thread temp_thread = thread_queue.Peek();
// if ( temp_thread.ThreadState == ThreadState.Stopped ) {
// temp_thread = thread_queue.Dequeue();
// Console.WriteLine( "thread {0:x} を取り出しました。", temp_thread.GetHashCode() );
// temp_thread = null;
// }
//}
if ( this.done_.WaitOne( 0 ) ) {
listener.Stop();
this.done_.Reset();
}
} while ( true );
} catch ( Exception e ) {
Console.WriteLine( e );
} finally {
if ( listener != null ) {
listener.Stop();
Console.WriteLine( "TCP リスナーを止めました。" );
}
}
}
/// <summary>
///
/// </summary>
/// <param name="arg"></param>
void acceptedPart(object arg) {
TcpListener listener = arg as TcpListener;
IAsyncResult acceptance = null;
ManualResetEvent accepted = new ManualResetEvent( false );
TimeSpan waiting_time = TimeSpan.FromMilliseconds( 100 );
bool signal_captured = false;
int loop_count = 0;
Console.WriteLine( "接続要求待機に入ります。" );
try {
acceptance = listener.BeginAcceptTcpClient( completed, accepted );
while ( !acceptance.IsCompleted ) {
if ( loop_count >= MAX_TRIALS ) {
Console.WriteLine( "接続要求待機中にタイムアウトしました。" );
this.request_timeouted_.Set();
break;
}
if ( signal_captured = accepted.WaitOne( waiting_time ) )
break;
if ( acceptance.AsyncWaitHandle.WaitOne( 0 ) )
break;
}
} catch ( SocketException se ) {
Console.WriteLine( "接続要求待機中にエラーが発生しました: {0}", se.Message );
} catch ( ObjectDisposedException ) {
Console.WriteLine( "接続要求待機をキャンセルします。" );
}
if ( !acceptance.IsCompleted ) {
Console.WriteLine( "クライアントからの接続要求がありませんでした。" );
return;
}
Console.WriteLine( "クライアントからの接続要求を受け入れました。" );
Thread.Sleep( 1000 );
TcpClient client = null;
NetworkStream stream = null;
try {
client = listener.EndAcceptTcpClient( acceptance );
try {
stream = client.GetStream();
string greeting = "Hi, How are you?";
string response = string.Empty;
while ( true ) {
Console.WriteLine( "alice < {0}", greeting );
this.write( stream, greeting, waiting_time );
if ( greeting.StartsWith( "bye" ) )
break;
Thread.Sleep( 300 );
response = this.read( stream, waiting_time );
Console.WriteLine( "bob > {0}", response );
if ( response.EndsWith( "How are you?" ) )
greeting = CONDITIONS[new Random().Next( CONDITIONS.Length )];
else if ( response.StartsWith( "over" ) )
greeting = BYE;
else
greeting = BYE;
}
} catch ( SocketException se ) {
Console.WriteLine( se.Message );
}
} finally {
stream.Close();
client.Close();
}
this.done_.Set();
Console.WriteLine( "接続要求待機を抜けます。" );
}
/// <summary>
///
/// </summary>
/// <param name="stream"></param>
/// <param name="timeout"></param>
/// <returns></returns>
string read(NetworkStream stream, TimeSpan timeout) {
byte[] temp_buffer = new byte[1024];
int amount_of_bytes_read = 0;
int output_trial_count = 0;
bool signal_captured = false;
ManualResetEvent readed = new ManualResetEvent( false );
IAsyncResult byte_reading = stream.BeginRead( temp_buffer, 0, temp_buffer.Length, completed, readed );
do {
if ( output_trial_count >= MAX_TRIALS )
throw new ApplicationException( "タイムアウトしました。" );
if ( signal_captured = readed.WaitOne( timeout ) )
break;
if ( byte_reading.AsyncWaitHandle.WaitOne( 0 ) )
continue;
++output_trial_count;
} while ( !byte_reading.IsCompleted );
//if ( byte_reading.IsCompleted && !byte_reading.CompletedSynchronously )
// throw new ApplicationException( "サーバーが切断しました。" );
amount_of_bytes_read = stream.EndRead( byte_reading );
return this.transfer_encoding_.GetString( temp_buffer, 0, amount_of_bytes_read );
}
/// <summary>
///
/// </summary>
/// <param name="stream"></param>
/// <param name="text"></param>
/// <param name="timeout"></param>
void write(NetworkStream stream, string text, TimeSpan timeout) {
int input_trial_count = 0;
byte[] sent_bytes = this.transfer_encoding_.GetBytes( text );
bool signal_captured = false;
ManualResetEvent wrote = new ManualResetEvent( false );
IAsyncResult byte_writing = stream.BeginWrite( sent_bytes, 0, sent_bytes.Length, completed, wrote );
do {
if ( input_trial_count >= MAX_TRIALS ) {
Console.WriteLine( "タイムアウトしました。" );
break;
}
if ( signal_captured = wrote.WaitOne( timeout ) )
break;
if ( byte_writing.AsyncWaitHandle.WaitOne( 0 ) )
continue;
++input_trial_count;
} while ( !byte_writing.IsCompleted );
//if ( byte_writing.IsCompleted && !byte_writing.CompletedSynchronously)
// throw new ApplicationException( "サーバーが切断しました。" );
stream.EndWrite( byte_writing );
}
/// <summary>
///
/// </summary>
/// <param name="ar"></param>
void completed(IAsyncResult ar) {
ManualResetEvent signal = ar.AsyncState as ManualResetEvent;
if ( ar.IsCompleted && ar.CompletedSynchronously )
signal.Set();
}
/// <summary>
///
/// </summary>
/// <param name="args"></param>
static void Main(string[] args) {
AsyncTcpServer progn = new AsyncTcpServer();
progn.run( args );
}
/// <summary>
/// 送信用エンコーディング。
/// </summary>
private Encoding transfer_encoding_ = Encoding.UTF8;
/// <summary>
/// ローカルエンドポイント。
/// </summary>
private IPEndPoint local_endpoint_;
/// <summary>
///
/// </summary>
private ManualResetEvent request_timeouted_ = new ManualResetEvent( false );
/// <summary>
///
/// </summary>
private ManualResetEvent done_ = new ManualResetEvent( false );
/// <summary>
///
/// </summary>
private static readonly int MAX_TRIALS = 32;
/// <summary>
///
/// </summary>
private static readonly string[] CONDITIONS = new string[] { "Pretty Good!", "Great!", "Not bad.", "Good!" };
/// <summary>
///
/// </summary>
private static readonly string BYE = "bye.";
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.