Skip to content

Instantly share code, notes, and snippets.

@barvinograd
Created October 30, 2012 14:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save barvinograd/3980608 to your computer and use it in GitHub Desktop.
Save barvinograd/3980608 to your computer and use it in GitHub Desktop.
Buffer Queue Stream
public class BufferQueueStream : Stream, IDisposable
{
private const int INTERNAL_BUFFER_SIZE = 64 * 1024;
private System.ServiceModel.Channels.BufferManager bufferManager = System.ServiceModel.Channels.BufferManager.CreateBufferManager(INTERNAL_BUFFER_SIZE * 100, INTERNAL_BUFFER_SIZE);
private Queue<byte[]> _bufferQueue = new Queue<byte[]>();
private byte[] _currentReadingBuffer;
private int _currentBufferPosition;
private long _totalLengthToRead = 0;
private long _position = 0;
private bool EnsureCurrentReadingBuffer()
{
//Either the current buffer is null or at the end. Get the next buffer
if (_currentReadingBuffer == null || _currentReadingBuffer.Length == _currentBufferPosition)
{
if (_bufferQueue.Count == 0)
{
return false;
}
else
{
if (_currentReadingBuffer != null && _currentReadingBuffer.Length == INTERNAL_BUFFER_SIZE)
bufferManager.ReturnBuffer(_currentReadingBuffer);
//Take new buffer from queue
_currentReadingBuffer = _bufferQueue.Dequeue();
_currentBufferPosition = 0;
return true;
}
}
else
{
return true;
}
}
public override bool CanRead
{
get { return true; }
}
public override bool CanSeek
{
get { return false; }
}
public override bool CanWrite
{
get { return true; }
}
public override void Flush()
{
//Do nothing
}
//Returns the length of the current queue
public override long Length
{
get
{
return _totalLengthToRead;
}
}
public override long Position
{
get
{
return _position;
}
set
{
throw new NotSupportedException();
}
}
public override int Read(byte[] buffer, int offset, int count)
{
int bytesRead = 0;
int outputBufferPosition = offset;
while (bytesRead < count)
{
//Get new buffer if neccessary
if (!EnsureCurrentReadingBuffer())
{
//Cannot get a new buffer. Finish reading.
return bytesRead;
}
//Calculate the data length to copy from current buffer to output buffer
int lengthToCopy = Math.Min(count - bytesRead, _currentReadingBuffer.Length - _currentBufferPosition);
//Copy data from current buffer to output buffer
Buffer.BlockCopy(_currentReadingBuffer, _currentBufferPosition, buffer, outputBufferPosition, lengthToCopy);
//Update counter, position, length
_currentBufferPosition += lengthToCopy;
bytesRead += lengthToCopy;
_totalLengthToRead -= lengthToCopy;
outputBufferPosition += lengthToCopy;
}
_position += bytesRead;
return bytesRead;
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
int currPos = offset;
byte[] arr;
while (currPos < count)
{
if (INTERNAL_BUFFER_SIZE <= count - currPos)
{
arr = bufferManager.TakeBuffer(INTERNAL_BUFFER_SIZE);
}
else
{
arr = new byte[count - currPos];
}
//Copy buffer
Buffer.BlockCopy(buffer, currPos, arr, 0, arr.Length);
//Enqueue
_bufferQueue.Enqueue(arr);
//Update counters
_totalLengthToRead += arr.Length;
currPos += arr.Length;
}
}
public void Dispose()
{
if (_currentReadingBuffer != null && _currentReadingBuffer.Length == INTERNAL_BUFFER_SIZE)
{
bufferManager.ReturnBuffer(_currentReadingBuffer);
_currentReadingBuffer = null;
}
if (_bufferQueue != null)
{
while (_bufferQueue.Count > 0)
{
bufferManager.ReturnBuffer(_bufferQueue.Dequeue());
}
_bufferQueue = null;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment