Skip to content

Instantly share code, notes, and snippets.

@mrtank
Created April 25, 2016 09:24
Show Gist options
  • Save mrtank/8c0da231dbdef1c665821815e0aed428 to your computer and use it in GitHub Desktop.
Save mrtank/8c0da231dbdef1c665821815e0aed428 to your computer and use it in GitHub Desktop.
private static IObservable<byte[]> ForEveryMessage(this TcpClient self, int byteCount, CancellationToken token)
{
return Observable.Create<byte[]>(
obs =>
{
NetworkStream stream = self.GetStream();
byte[] buffer = new byte[byteCount];
var recieving = Observable.FromAsyncPattern<byte[], int, int, int>(stream.BeginRead, stream.EndRead);
bool zeroBytesRead = false;
int remainder = byteCount;
return Observable.While(
() => !token.IsCancellationRequested && remainder > 0 && !zeroBytesRead,
Observable.Defer(() =>
recieving(buffer, buffer.Length - remainder, remainder).Timeout(TimeSpan.FromSeconds(TIMEOUTSECONDS))
.Do(read => { remainder = remainder - read; zeroBytesRead = read == 0; })))
.PublishLast(whenCompleted => whenCompleted.Select(_ => {
if (zeroBytesRead)
{
//WSAECONNABORTED
//Software caused connection abort.
//An established connection was aborted by the software in your host computer, possibly due to a data transmission timeout or protocol error.
throw new SocketException(10053);
}
return buffer;
}))
.Do(x => { Console.WriteLine(ReadableByteArray(x)); })
.Subscribe(obs.OnNext, obs.OnError, obs.OnCompleted);
}
);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment