Skip to content

Instantly share code, notes, and snippets.

Created January 26, 2011 02:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mythz/796118 to your computer and use it in GitHub Desktop.
Save mythz/796118 to your computer and use it in GitHub Desktop.
Naive Example of a 1 Read + 1 Write thread buffer
/* Incomplete un-tested buffered implementation designed to accomodate 1 write thread and 1 read thread only
* Not implemented yet, but is expected to be pooled.
public class NonBlockingReadBufferedStream
: Stream, IEnumerable<Action<Action<object>, Action<Exception>>>, IDisposable
private const int MtuAppSize = 1450;
private const int BufferAllocationSize = 32 * 1024;
internal int ResetClearsBufferOfMaxSize = 4 * 1024 * 1024; //4MB
internal byte[] Buffer = new byte[BufferAllocationSize];
internal int WriteIndex = 0;
public override void Write(byte[] srcBytes, int srcOffset, int srcCount)
if ((WriteIndex + srcCount) > Buffer.Length)
const int breathingSpaceToReduceReallocations = BufferAllocationSize;
var newLargerBuffer = new byte[WriteIndex + srcCount + breathingSpaceToReduceReallocations];
System.Buffer.BlockCopy(Buffer, 0, newLargerBuffer, 0, Buffer.Length);
Buffer = newLargerBuffer;
System.Buffer.BlockCopy(srcBytes, srcOffset, Buffer, WriteIndex, srcCount);
WriteIndex += srcBytes.Length;
public override void Flush() {}
public override long Seek(long offset, SeekOrigin origin)
throw new NotImplementedException();
public override void SetLength(long value)
throw new NotImplementedException();
public override int Read(byte[] buffer, int offset, int count)
throw new NotImplementedException("Use the IEnumerator to read");
public override bool CanRead
get { return false; }
public override bool CanSeek
get { return false; }
public override bool CanWrite
get { return true; }
public override long Length
get { return this.WriteIndex; }
public override long Position
return WriteIndex;
WriteIndex = (int) value;
public void Reset()
//These buffers are expected to be pooled but remove large writes to save memory
if (Buffer.Length > ResetClearsBufferOfMaxSize)
Buffer = new byte[BufferAllocationSize];
WriteIndex = 0;
IsDisposed = false;
//Called from IEnumerator
internal void Release()
internal bool IsDisposed;
void IDisposable.Dispose()
IsDisposed = true;
public class Enumerator
: IEnumerator<Action<Action<object>, Action<Exception>>>
private readonly NonBlockingReadBufferedStream stream;
private int readIndex = 0;
private int emptyResponsesCount = 0;
private ArraySegment<byte> currentSegment;
public Enumerator(NonBlockingReadBufferedStream stream)
{ = stream;
public bool MoveNext()
if (readIndex >= stream.WriteIndex)
currentSegment = new ArraySegment<byte>(
stream.Buffer, stream.WriteIndex, 0);
return stream.IsDisposed; //or false?
var newReadIndex = readIndex + MtuAppSize;
if (newReadIndex <= stream.WriteIndex)
currentSegment = new ArraySegment<byte>(
stream.Buffer, readIndex, MtuAppSize);
readIndex = newReadIndex;
return newReadIndex != stream.WriteIndex;
//Partial chunk size left
newReadIndex = stream.WriteIndex;
currentSegment = new ArraySegment<byte>(
stream.Buffer, readIndex, newReadIndex);
return stream.IsDisposed; //or false?
public void Reset()
readIndex = 0;
public void CallBack(Action<object> readCallback, Action<Exception> ex)
public Action<Action<object>, Action<Exception>> Current
return CallBack;
object IEnumerator.Current
get { return Current; }
public void Dispose()
public IEnumerator<Action<Action<object>, Action<Exception>>> GetEnumerator()
return new Enumerator(this);
IEnumerator IEnumerable.GetEnumerator()
return GetEnumerator();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment