Created
January 26, 2011 02:24
-
-
Save mythz/796118 to your computer and use it in GitHub Desktop.
Naive Example of a 1 Read + 1 Write thread buffer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* Incomplete un-tested buffered implementation designed to accomodate 1 write thread and 1 read thread only | |
* Not implemented yet, but is expected to be pooled. | |
*/ | |
public class NonBlockingReadBufferedStream | |
: Stream, IEnumerable<Action<Action<object>, Action<Exception>>>, IDisposable | |
{ | |
private const int MtuAppSize = 1450; | |
private const int BufferAllocationSize = 32 * 1024; | |
internal int ResetClearsBufferOfMaxSize = 4 * 1024 * 1024; //4MB | |
internal byte[] Buffer = new byte[BufferAllocationSize]; | |
internal int WriteIndex = 0; | |
public override void Write(byte[] srcBytes, int srcOffset, int srcCount) | |
{ | |
if ((WriteIndex + srcCount) > Buffer.Length) | |
{ | |
const int breathingSpaceToReduceReallocations = BufferAllocationSize; | |
var newLargerBuffer = new byte[WriteIndex + srcCount + breathingSpaceToReduceReallocations]; | |
System.Buffer.BlockCopy(Buffer, 0, newLargerBuffer, 0, Buffer.Length); | |
Buffer = newLargerBuffer; | |
} | |
System.Buffer.BlockCopy(srcBytes, srcOffset, Buffer, WriteIndex, srcCount); | |
WriteIndex += srcBytes.Length; | |
} | |
public override void Flush() {} | |
public override long Seek(long offset, SeekOrigin origin) | |
{ | |
throw new NotImplementedException(); | |
} | |
public override void SetLength(long value) | |
{ | |
throw new NotImplementedException(); | |
} | |
public override int Read(byte[] buffer, int offset, int count) | |
{ | |
throw new NotImplementedException("Use the IEnumerator to read"); | |
} | |
public override bool CanRead | |
{ | |
get { return false; } | |
} | |
public override bool CanSeek | |
{ | |
get { return false; } | |
} | |
public override bool CanWrite | |
{ | |
get { return true; } | |
} | |
public override long Length | |
{ | |
get { return this.WriteIndex; } | |
} | |
public override long Position | |
{ | |
get | |
{ | |
return WriteIndex; | |
} | |
set | |
{ | |
WriteIndex = (int) value; | |
} | |
} | |
public void Reset() | |
{ | |
//These buffers are expected to be pooled but remove large writes to save memory | |
if (Buffer.Length > ResetClearsBufferOfMaxSize) | |
{ | |
Buffer = new byte[BufferAllocationSize]; | |
} | |
WriteIndex = 0; | |
IsDisposed = false; | |
} | |
//Called from IEnumerator | |
internal void Release() | |
{ | |
Reset(); | |
} | |
internal bool IsDisposed; | |
void IDisposable.Dispose() | |
{ | |
IsDisposed = true; | |
} | |
public class Enumerator | |
: IEnumerator<Action<Action<object>, Action<Exception>>> | |
{ | |
private readonly NonBlockingReadBufferedStream stream; | |
private int readIndex = 0; | |
private int emptyResponsesCount = 0; | |
private ArraySegment<byte> currentSegment; | |
public Enumerator(NonBlockingReadBufferedStream stream) | |
{ | |
this.stream = stream; | |
} | |
public bool MoveNext() | |
{ | |
if (readIndex >= stream.WriteIndex) | |
{ | |
currentSegment = new ArraySegment<byte>( | |
stream.Buffer, stream.WriteIndex, 0); | |
emptyResponsesCount++; | |
return stream.IsDisposed; //or false? | |
} | |
var newReadIndex = readIndex + MtuAppSize; | |
if (newReadIndex <= stream.WriteIndex) | |
{ | |
currentSegment = new ArraySegment<byte>( | |
stream.Buffer, readIndex, MtuAppSize); | |
readIndex = newReadIndex; | |
return newReadIndex != stream.WriteIndex; | |
} | |
//Partial chunk size left | |
newReadIndex = stream.WriteIndex; | |
currentSegment = new ArraySegment<byte>( | |
stream.Buffer, readIndex, newReadIndex); | |
return stream.IsDisposed; //or false? | |
} | |
public void Reset() | |
{ | |
readIndex = 0; | |
} | |
public void CallBack(Action<object> readCallback, Action<Exception> ex) | |
{ | |
readCallback(currentSegment); | |
} | |
public Action<Action<object>, Action<Exception>> Current | |
{ | |
get | |
{ | |
return CallBack; | |
} | |
} | |
object IEnumerator.Current | |
{ | |
get { return Current; } | |
} | |
public void Dispose() | |
{ | |
stream.Release(); | |
} | |
} | |
public IEnumerator<Action<Action<object>, Action<Exception>>> GetEnumerator() | |
{ | |
return new Enumerator(this); | |
} | |
IEnumerator IEnumerable.GetEnumerator() | |
{ | |
return GetEnumerator(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment