Skip to content

Instantly share code, notes, and snippets.

@svet93
Created August 4, 2017 13:47
Show Gist options
  • Save svet93/fb96d8fd12bfc9f9f3a8f0267dfbaf68 to your computer and use it in GitHub Desktop.
Save svet93/fb96d8fd12bfc9f9f3a8f0267dfbaf68 to your computer and use it in GitHub Desktop.
NetworkStream async read with timeout
/*
I had a scenario where I had to have a persistent Tcp connection and wanted to create async reading so I don't have to block
a thread per every read. NetworkStream's ReadAsync though completely ignores the CancellationToken so that could not be used as
a timeout.
On StackOverflow a lot of people were suggesting to close the stream on timeout, but that results in loss of data. Closing the
client overall was not an option in my scenario. Therefore, I wrote this 2 level read in which I have a background thread from
the thread pool (Task.Run) read the Stream all the time and write to a queue. Then, whenever I have to read from the application
layer, I simply read what is in the queue, and if there is nothing in the queue, I initiate a wait on a semaphore for 15 secs
(timeout). So if the stream reader writes, it will Release() and the other thread will return to check the queue once again.
If there is nothing in the queue, then the thread returned due to a timeout.
Note: I have not tested this with multiple readers, as in my scenario it is not necessary.
*/
// initialized elsewhere in the app from a TcpClient.GetStream()
private NetworkStream m_TcpStream;
private CancellationToken _shutdownToken;
private readonly Queue<byte> bigBuffer = new Queue<byte>();
private readonly SemaphoreSlim _signal = new SemaphoreSlim(0, 1);
// This is called in a Task.Run()
private async Task beginReadingStream()
{
byte[] buffer = new byte[1024];
using (_shutdownToken.Register(() => m_TcpStream.Close()))
{
while (!_shutdownToken.IsCancellationRequested)
{
try
{
int bytesReceived = 0;
if (m_TcpStream.CanRead)
{
bytesReceived = await m_TcpStream.ReadAsync(buffer, 0, buffer.Length).ConfigureAwait(false);
}
else
{
// in case the stream is not working, wait a little bit so we don't keep a thread busy during re-creation
await Task.Delay(3000, _shutdownToken);
}
if (bytesReceived > 0)
{
for (int i = 0; i < bytesReceived; i++)
{
bigBuffer.Enqueue(buffer[i]);
}
_signal.Release();
Array.Clear(buffer, 0, buffer.Length);
}
}
catch (Exception e)
{
LoggingService.Log(e);
}
}
}
}
public async Task<int> ReadAsyncWithTimeout(byte[] buffer, int offset, int count)
{
int bytesToBeRead = 0;
if (!m_TcpClient.Connected)
{
throw new ObjectDisposedException("Socket is not connected");
}
if (bigBuffer.Count > 0)
{
bytesToBeRead = bigBuffer.Count < count ? bigBuffer.Count : count;
for (int i = offset; i < bytesToBeRead; i++)
{
buffer[i] = bigBuffer.Dequeue();
}
// Clear up the semaphore in case of a race condition where the writer just wrote and then this came in and read it without waiting
if (_signal.CurrentCount > 0)
await _signal.WaitAsync(BabelfishConst.TCPIP_READ_TIME_OUT_IN_MS, _shutdownToken).ConfigureAwait(false);
return bytesToBeRead;
}
// In case there is nothing in the Q, wait up to timeout to get data from the writer
await _signal.WaitAsync(15000, _shutdownToken).ConfigureAwait(false);
// read again in case the semaphore was signaled by an Enqueue
if (bigBuffer.Count > 0)
{
bytesToBeRead = bigBuffer.Count < count ? bigBuffer.Count : count;
for (int i = offset; i < bytesToBeRead; i++)
{
buffer[i] = bigBuffer.Dequeue();
}
return bytesToBeRead;
}
// This is because the synchronous NetworkStream Read() method throws this exception when it times out
throw new IOException();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment