Created
December 31, 2015 06:49
-
-
Save hammett/45c195a35c48fae73806 to your computer and use it in GitHub Desktop.
stream ring buffer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
internal class RingBufferStream : Stream | |
{ | |
private const int BufferSize = 1024; | |
private volatile int _totalLen = 0; | |
private volatile int _readPosition = -1; | |
private volatile int _writePosition = -1; | |
private readonly ManualResetEventSlim _locker = new ManualResetEventSlim(false); | |
private readonly byte[] _buffer; | |
public RingBufferStream() | |
{ | |
_buffer = new byte[BufferSize]; | |
} | |
/// <summary> | |
/// append param buffer to our ring (single threaded producer) | |
/// </summary> | |
public async Task InsertAsync(byte[] buffer, int offset, int original) | |
{ | |
if (original > BufferSize) throw new ArgumentException("overflowing the buffer? nope"); | |
if (_writePosition == -1) | |
{ | |
_writePosition = 0; | |
} | |
// scenario 1: we have enough capacity | |
// scenario 2: we dont have enough capacity (as long as we dont wrap the buffer, it's fine) | |
var spinWait = new SpinWait(); | |
var totalCopied = 0; | |
var userBufferRemainLen = original; | |
while (true) | |
{ | |
var readPosAbs = ReadCursorPos(); | |
var writePosAbs = WriteCursorPos(); | |
var isSane = writePosAbs >= readPosAbs + this.UnreadLength(); | |
Debug.Assert(isSane, "not sane"); | |
// enough room? | |
if (BufferSize - UnreadLength() == 0) | |
{ | |
// Console.WriteLine("waiting"); | |
if (spinWait.NextSpinWillYield) | |
{ | |
_locker.Wait(); | |
continue; | |
} | |
spinWait.SpinOnce(); | |
continue; | |
} | |
var writePos = WriteCursorPosNormalized(); | |
var sizeForCopy = Math.Min(BufferSize - writePos, userBufferRemainLen); | |
// Debug.Assert(sizeForCopy > 0, "will copy something"); | |
if (BufferSize - writePos == 0) | |
{ | |
// Console.WriteLine("waiting2"); | |
if (spinWait.NextSpinWillYield) | |
{ | |
_locker.Wait(); | |
continue; | |
} | |
spinWait.SpinOnce(); | |
continue; | |
} | |
var readpos = ReadCursorPosNormalized(); | |
if (writePos < readpos && writePos + sizeForCopy > readpos) | |
{ | |
// wrap! | |
// Debug.Assert(false, "will wrap"); | |
sizeForCopy = readpos - writePos; | |
} | |
Buffer.BlockCopy(buffer, offset, _buffer, writePos, sizeForCopy); | |
totalCopied += sizeForCopy; // total copied | |
// volatile writes | |
_totalLen += sizeForCopy; | |
_writePosition += sizeForCopy; | |
if (totalCopied == original) break; // all copied? | |
offset += sizeForCopy; // offset of remainder | |
userBufferRemainLen -= sizeForCopy; | |
} | |
_locker.Reset(); | |
} | |
// public async Task<int> ReadAsync(byte[] buffer, int offset, int count) | |
// { | |
// return 0; | |
// } | |
#region Overrides of Stream | |
public override int Read(byte[] buffer, int offset, int count) | |
{ | |
if (_readPosition == -1) | |
{ | |
if (_writePosition != -1) | |
_readPosition = 0; | |
else | |
return 0; // nothing was written yet | |
} | |
int totalRead = 0; | |
int userBufferRemainLen = count; | |
while (totalRead < count) | |
{ | |
int unread = UnreadLength(); | |
if (unread == 0) break; | |
var readpos = ReadCursorPosNormalized(); | |
int lenToRead = Math.Min(Math.Min(BufferSize - readpos, unread), userBufferRemainLen); | |
Buffer.BlockCopy(_buffer, readpos, buffer, offset, lenToRead); | |
_readPosition += lenToRead; // volative write | |
_locker.Set(); | |
offset += lenToRead; | |
userBufferRemainLen -= lenToRead; | |
totalRead += lenToRead; | |
} | |
return totalRead; | |
} | |
public override long Seek(long offset, SeekOrigin origin) | |
{ | |
throw new NotSupportedException(); | |
} | |
public override void SetLength(long value) | |
{ | |
throw new NotSupportedException(); | |
} | |
public override void Write(byte[] buffer, int offset, int count) | |
{ | |
throw new NotSupportedException(); | |
} | |
public override bool CanRead | |
{ | |
get { return true; } | |
} | |
public override bool CanSeek | |
{ | |
get { return false; } | |
} | |
public override bool CanWrite | |
{ | |
get { return false; } | |
} | |
public override long Length | |
{ | |
get { return _totalLen; } | |
} | |
public override long Position | |
{ | |
get { return _readPosition; } | |
set | |
{ | |
throw new NotSupportedException(); | |
} | |
} | |
public override void Flush() | |
{ | |
} | |
#endregion | |
private int UnreadLength() | |
{ | |
if (_readPosition == -1) return _totalLen; | |
return (_totalLen - _readPosition); | |
} | |
private int ReadCursorPos() | |
{ | |
return _readPosition; | |
} | |
private int ReadCursorPosNormalized() | |
{ | |
return _readPosition % BufferSize; | |
} | |
private int WriteCursorPos() | |
{ | |
return _writePosition; | |
} | |
private int WriteCursorPosNormalized() | |
{ | |
return _writePosition % BufferSize; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment