Skip to content

Instantly share code, notes, and snippets.

@mrtank
Last active May 22, 2019 09:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mrtank/853bdb7957a2964dce4bb98ab06def8a to your computer and use it in GitHub Desktop.
Save mrtank/853bdb7957a2964dce4bb98ab06def8a to your computer and use it in GitHub Desktop.
namespace RxSocketTest.TestDoubles.GlobalStates
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using RxSocket;
using TestDoubles;
internal class ConnectionController
{
// ReSharper disable once NotAccessedField.Local
private FakeTcpListener _mockTcpListener;
private CancellationToken _token;
private readonly Random _random = new Random();
private readonly Dictionary<ITcpClient, ITcpClient> _clientConnectionMapping = new Dictionary<ITcpClient, ITcpClient>();
private readonly Dictionary<ITcpClient, MemoryStream> _readyToRead = new Dictionary<ITcpClient, MemoryStream>();
private readonly Dictionary<ITcpClient, AutoResetEvent> _writeStartWaitHandleForClient = new Dictionary<ITcpClient, AutoResetEvent>();
public ConnectionController(FakeTcpListener mockTcpListener, CancellationToken token)
{
_mockTcpListener = mockTcpListener;
_token = token;
}
internal byte[] ReadBytesForClient(FakeNetworkStream x)
{
return _readyToRead[this[x.GetMyClient()]].ToArray();
}
internal void Close(FakeTcpClient fakeTcpClient)
{
var connectedClient = this[fakeTcpClient];
((FakeTcpClient)connectedClient).UnConnect();
_writeStartWaitHandleForClient[fakeTcpClient].Set();
_writeStartWaitHandleForClient[connectedClient].Set();
}
internal ITcpClient this[ITcpClient from]
{
get { return _clientConnectionMapping[from]; }
private set
{
if (_clientConnectionMapping.ContainsKey(from))
{
throw new InvalidOperationException("Cannot connect again");
}
_clientConnectionMapping.Add(from, value);
}
}
internal Task<int> ReadAsync(FakeTcpClient fakeTcpClient, ReadDTO readDTO)
{
return Task.Run(() => WaitUntilWriteHappen(fakeTcpClient), _token).ContinueWith(x => ResultAfterWriteHappen(fakeTcpClient, readDTO), _token);
}
private int SignalNextReadMayHappen(Task<int> continuedTask, object resetEvent)
{
((AutoResetEvent)resetEvent).Set();
return continuedTask.Result;
}
private int ResultAfterWriteHappen(FakeTcpClient fakeTcpClient, ReadDTO readDTO)
{
if (!fakeTcpClient.Living)
{
return 0;
}
return _readyToRead[fakeTcpClient].ReadAsync(readDTO.Buffer, readDTO.Offset, readDTO.Size, _token).ContinueWith(SignalNextReadMayHappen, _writeStartWaitHandleForClient[fakeTcpClient], _token).Result;
}
private void WaitUntilWriteHappen(FakeTcpClient fakeTcpClient)
{
while (_readyToRead[fakeTcpClient].Position >= _readyToRead[fakeTcpClient].Length)
{
_writeStartWaitHandleForClient[fakeTcpClient].WaitOne();
_token.ThrowIfCancellationRequested();
}
}
internal void Connect(FakeTcpClient lastCreatedClient, FakeTcpClient mockTcpClient)
{
this[mockTcpClient] = lastCreatedClient;
this[lastCreatedClient] = mockTcpClient;
var ip = new byte[4];
_random.NextBytes(ip);
lastCreatedClient.Client.RemoteEndPoint = new IPEndPoint(new IPAddress(ip), _random.Next(1, short.MaxValue));
mockTcpClient.Client.RemoteEndPoint = new IPEndPoint(new IPAddress(new byte[] { 127, 0, 0, 1 }), 11122);
mockTcpClient.Client.LocalEndPoint = lastCreatedClient.Client.RemoteEndPoint;
lastCreatedClient.Client.LocalEndPoint = mockTcpClient.Client.RemoteEndPoint;
_writeStartWaitHandleForClient[mockTcpClient] = new AutoResetEvent(false);
_writeStartWaitHandleForClient[lastCreatedClient] = new AutoResetEvent(false);
_token.Register(o => _writeStartWaitHandleForClient[(ITcpClient)o].Set(), mockTcpClient);
_token.Register(o => _writeStartWaitHandleForClient[(ITcpClient)o].Set(), lastCreatedClient);
_readyToRead[mockTcpClient] = new MemoryStream();
_readyToRead[lastCreatedClient] = new MemoryStream();
}
internal void Write(FakeTcpClient _mockTcpClient, ReadDTO readDTO)
{
long position = _readyToRead[this[_mockTcpClient]].Position;
_readyToRead[this[_mockTcpClient]].Position = _readyToRead[this[_mockTcpClient]].Length;
_readyToRead[this[_mockTcpClient]].Write(readDTO.Buffer, readDTO.Offset, readDTO.Size);
_readyToRead[this[_mockTcpClient]].Position = position;
_writeStartWaitHandleForClient[this[_mockTcpClient]].Set();
}
}
}
namespace RxSocketTest.TestDoubles
{
using System.IO;
using System.Threading.Tasks;
using GlobalStates;
using RxSocket;
internal class FakeNetworkStream: Stream, INetworkStream
{
private readonly FakeTcpClient _fakeTcpClient;
private readonly ConnectionController _connectionController;
private readonly MemoryStream _storedWrites;
internal int WriteCount { get; private set; }
public FakeNetworkStream(FakeTcpClient mockTcpClient, ConnectionController connectionController)
{
_fakeTcpClient = mockTcpClient;
_connectionController = connectionController;
WriteCount = 0;
_storedWrites = new MemoryStream();
}
public override void Close()
{
_fakeTcpClient.Close();
}
public override void Write(byte[] buffer, int offset, int size)
{
ReadDTO readDTO = new ReadDTO(buffer, offset, size);
_connectionController.Write(_fakeTcpClient, readDTO);
_fakeTcpClient.WriteCount++;
_storedWrites.Write(buffer, offset, size);
WriteCount++;
}
internal byte[] GetReadBytes()
{
return _storedWrites.ToArray();
}
public new Task<int> ReadAsync(byte[] buffer, int offset, int count)
{
return _connectionController.ReadAsync(_fakeTcpClient, new ReadDTO(buffer, offset, count));
}
internal ConnectionController GetConnectionController()
{
return _connectionController;
}
internal FakeTcpClient GetMyClient()
{
return _fakeTcpClient;
}
// todo: implement getters from _connectionController + logic + reassurance from a real NetworkStream's behaviour.
public override bool CanRead { get; }
public override bool CanSeek { get; }
public override bool CanWrite { get; }
public override long Length { get; }
public override long Position { get; set; }
public override void Flush()
{
throw new System.NotImplementedException();
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new System.NotImplementedException();
}
public override void SetLength(long value)
{
throw new System.NotImplementedException();
}
public override int Read(byte[] buffer, int offset, int count)
{
throw new System.NotImplementedException();
}
}
}
namespace RxSocketTest.TestDoubles
{
using GlobalStates;
using Moq;
using RxSocket;
public class FakeTcpClient : ITcpClient
{
private readonly FakeTcpListener _listener;
private readonly ConnectionController _connectionController;
private FakeNetworkStream _myStream;
public FakeTcpClient(FakeTcpListener listener)
{
_listener = listener;
Client = Mock.Of<ISocket>();
_connectionController = listener.GetConnectionController();
ConnectCount = 0;
WriteCount = 0;
}
public ISocket Client
{
get;
}
public bool Connected { get; private set; }
internal int ConnectCount { get; private set; }
internal int WriteCount { get; set; }
public bool Living { get; private set; }
public void Close()
{
Living = false;
_connectionController.Close(this);
}
internal void ConnectBack()
{
Connected = true;
Living = true;
_myStream = new FakeNetworkStream(this, _connectionController);
}
public void Connect(string address, int port)
{
Connected = true;
Living = true;
_myStream = new FakeNetworkStream(this, _connectionController);
_listener.Connect(this);
ConnectCount++;
}
internal void UnConnect()
{
Living = false;
}
public override string ToString()
{
return Client.LocalEndPoint.ToString();
}
public INetworkStream GetStream()
{
return _myStream;
}
}
}
namespace RxSocketTest.TestDoubles
{
using System;
using System.Reactive.Concurrency;
using System.Threading;
using System.Threading.Tasks;
using GlobalStates;
using RxSocket;
public class FakeTcpListener : ITcpListener
{
private CancellationToken _token;
private readonly IScheduler _scheduler;
readonly AutoResetEvent _waitForConnect = new AutoResetEvent(true);
readonly AutoResetEvent _waitForAccept = new AutoResetEvent(false);
private readonly ConnectionController _connectionController;
private TaskCompletionSource<ITcpClient> _source;
public FakeTcpListener(CancellationToken token, IScheduler scheduler)
{
_scheduler = scheduler;
_token = token;
_connectionController = new ConnectionController(this, token);
_token.Register(() => _waitForAccept.Set());
}
internal ConnectionController GetConnectionController()
{
return _connectionController;
}
internal void Connect(FakeTcpClient mockTcpClient)
{
_waitForAccept.WaitOne();
_token.ThrowIfCancellationRequested();
_scheduler.Sleep(TimeSpan.FromTicks(200L));
FakeTcpClient lastCreatedClient = new FakeTcpClient(this);
_connectionController.Connect(lastCreatedClient, mockTcpClient);
lastCreatedClient.ConnectBack();
_source.SetResult(lastCreatedClient);
}
public Task<ITcpClient> AcceptTcpClientAsync()
{
_waitForConnect.WaitOne();
_token.ThrowIfCancellationRequested();
_source = new TaskCompletionSource<ITcpClient>();
_waitForAccept.Set();
_token.Register(() => { _source.TrySetCanceled(); });
return _source.Task.ContinueWith((tsk, state) =>
{
((AutoResetEvent)state).Set();
return tsk.Result;
}, _waitForConnect, _token);
}
internal bool Living { get; private set; }
public void Start()
{
Living = true;
}
public void Stop()
{
Living = false;
}
}
}
namespace RxSocket
{
using System.Threading.Tasks;
public interface INetworkStream
{
void Write(byte[] buffer, int offset, int size);
void Close();
Task<int> ReadAsync(byte[] buffer, int offset, int count);
}
}
namespace RxSocket
{
using System.Net;
public interface ISocket
{
EndPoint RemoteEndPoint { get; set; }
EndPoint LocalEndPoint { get; set; }
}
}
namespace RxSocket
{
public interface ITcpClient
{
void Close();
bool Connected { get; }
ISocket Client { get; }
void Connect(string address, int port);
INetworkStream GetStream();
}
}
namespace RxSocket
{
using System.Net.Sockets;
using System.Threading.Tasks;
internal class MyNetworkStream: INetworkStream
{
private readonly NetworkStream _networkStream;
public MyNetworkStream(NetworkStream networkStream)
{
_networkStream = networkStream;
}
public void Close()
{
_networkStream.Close();
}
public Task<int> ReadAsync(byte[] buffer, int offset, int count)
{
return _networkStream.ReadAsync(buffer, offset, count);
}
public void Write(byte[] buffer, int offset, int size)
{
_networkStream.Write(buffer, offset, size);
}
}
}
namespace RxSocketTest.TestDoubles.GlobalStates
{
internal class ReadDTO
{
public ReadDTO(byte[] buffer, int offset, int size)
{
Buffer = buffer;
Offset = offset;
Size = size;
}
public byte[] Buffer { get; set; }
public int Offset { get; set; }
public int Size { get; set; }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment