Skip to content

Instantly share code, notes, and snippets.

@hammett
Created December 31, 2015 06:49
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hammett/45c195a35c48fae73806 to your computer and use it in GitHub Desktop.
Save hammett/45c195a35c48fae73806 to your computer and use it in GitHub Desktop.
stream ring buffer
internal class RingBufferStream : Stream
{
private const int BufferSize = 1024;
private volatile int _totalLen = 0;
private volatile int _readPosition = -1;
private volatile int _writePosition = -1;
private readonly ManualResetEventSlim _locker = new ManualResetEventSlim(false);
private readonly byte[] _buffer;
public RingBufferStream()
{
_buffer = new byte[BufferSize];
}
/// <summary>
/// append param buffer to our ring (single threaded producer)
/// </summary>
public async Task InsertAsync(byte[] buffer, int offset, int original)
{
if (original > BufferSize) throw new ArgumentException("overflowing the buffer? nope");
if (_writePosition == -1)
{
_writePosition = 0;
}
// scenario 1: we have enough capacity
// scenario 2: we dont have enough capacity (as long as we dont wrap the buffer, it's fine)
var spinWait = new SpinWait();
var totalCopied = 0;
var userBufferRemainLen = original;
while (true)
{
var readPosAbs = ReadCursorPos();
var writePosAbs = WriteCursorPos();
var isSane = writePosAbs >= readPosAbs + this.UnreadLength();
Debug.Assert(isSane, "not sane");
// enough room?
if (BufferSize - UnreadLength() == 0)
{
// Console.WriteLine("waiting");
if (spinWait.NextSpinWillYield)
{
_locker.Wait();
continue;
}
spinWait.SpinOnce();
continue;
}
var writePos = WriteCursorPosNormalized();
var sizeForCopy = Math.Min(BufferSize - writePos, userBufferRemainLen);
// Debug.Assert(sizeForCopy > 0, "will copy something");
if (BufferSize - writePos == 0)
{
// Console.WriteLine("waiting2");
if (spinWait.NextSpinWillYield)
{
_locker.Wait();
continue;
}
spinWait.SpinOnce();
continue;
}
var readpos = ReadCursorPosNormalized();
if (writePos < readpos && writePos + sizeForCopy > readpos)
{
// wrap!
// Debug.Assert(false, "will wrap");
sizeForCopy = readpos - writePos;
}
Buffer.BlockCopy(buffer, offset, _buffer, writePos, sizeForCopy);
totalCopied += sizeForCopy; // total copied
// volatile writes
_totalLen += sizeForCopy;
_writePosition += sizeForCopy;
if (totalCopied == original) break; // all copied?
offset += sizeForCopy; // offset of remainder
userBufferRemainLen -= sizeForCopy;
}
_locker.Reset();
}
// public async Task<int> ReadAsync(byte[] buffer, int offset, int count)
// {
// return 0;
// }
#region Overrides of Stream
public override int Read(byte[] buffer, int offset, int count)
{
if (_readPosition == -1)
{
if (_writePosition != -1)
_readPosition = 0;
else
return 0; // nothing was written yet
}
int totalRead = 0;
int userBufferRemainLen = count;
while (totalRead < count)
{
int unread = UnreadLength();
if (unread == 0) break;
var readpos = ReadCursorPosNormalized();
int lenToRead = Math.Min(Math.Min(BufferSize - readpos, unread), userBufferRemainLen);
Buffer.BlockCopy(_buffer, readpos, buffer, offset, lenToRead);
_readPosition += lenToRead; // volative write
_locker.Set();
offset += lenToRead;
userBufferRemainLen -= lenToRead;
totalRead += lenToRead;
}
return totalRead;
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}
public override bool CanRead
{
get { return true; }
}
public override bool CanSeek
{
get { return false; }
}
public override bool CanWrite
{
get { return false; }
}
public override long Length
{
get { return _totalLen; }
}
public override long Position
{
get { return _readPosition; }
set
{
throw new NotSupportedException();
}
}
public override void Flush()
{
}
#endregion
private int UnreadLength()
{
if (_readPosition == -1) return _totalLen;
return (_totalLen - _readPosition);
}
private int ReadCursorPos()
{
return _readPosition;
}
private int ReadCursorPosNormalized()
{
return _readPosition % BufferSize;
}
private int WriteCursorPos()
{
return _writePosition;
}
private int WriteCursorPosNormalized()
{
return _writePosition % BufferSize;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment