Created
October 30, 2012 14:43
-
-
Save barvinograd/3980608 to your computer and use it in GitHub Desktop.
Buffer Queue Stream
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class BufferQueueStream : Stream, IDisposable | |
{ | |
private const int INTERNAL_BUFFER_SIZE = 64 * 1024; | |
private System.ServiceModel.Channels.BufferManager bufferManager = System.ServiceModel.Channels.BufferManager.CreateBufferManager(INTERNAL_BUFFER_SIZE * 100, INTERNAL_BUFFER_SIZE); | |
private Queue<byte[]> _bufferQueue = new Queue<byte[]>(); | |
private byte[] _currentReadingBuffer; | |
private int _currentBufferPosition; | |
private long _totalLengthToRead = 0; | |
private long _position = 0; | |
private bool EnsureCurrentReadingBuffer() | |
{ | |
//Either the current buffer is null or at the end. Get the next buffer | |
if (_currentReadingBuffer == null || _currentReadingBuffer.Length == _currentBufferPosition) | |
{ | |
if (_bufferQueue.Count == 0) | |
{ | |
return false; | |
} | |
else | |
{ | |
if (_currentReadingBuffer != null && _currentReadingBuffer.Length == INTERNAL_BUFFER_SIZE) | |
bufferManager.ReturnBuffer(_currentReadingBuffer); | |
//Take new buffer from queue | |
_currentReadingBuffer = _bufferQueue.Dequeue(); | |
_currentBufferPosition = 0; | |
return true; | |
} | |
} | |
else | |
{ | |
return true; | |
} | |
} | |
public override bool CanRead | |
{ | |
get { return true; } | |
} | |
public override bool CanSeek | |
{ | |
get { return false; } | |
} | |
public override bool CanWrite | |
{ | |
get { return true; } | |
} | |
public override void Flush() | |
{ | |
//Do nothing | |
} | |
//Returns the length of the current queue | |
public override long Length | |
{ | |
get | |
{ | |
return _totalLengthToRead; | |
} | |
} | |
public override long Position | |
{ | |
get | |
{ | |
return _position; | |
} | |
set | |
{ | |
throw new NotSupportedException(); | |
} | |
} | |
public override int Read(byte[] buffer, int offset, int count) | |
{ | |
int bytesRead = 0; | |
int outputBufferPosition = offset; | |
while (bytesRead < count) | |
{ | |
//Get new buffer if neccessary | |
if (!EnsureCurrentReadingBuffer()) | |
{ | |
//Cannot get a new buffer. Finish reading. | |
return bytesRead; | |
} | |
//Calculate the data length to copy from current buffer to output buffer | |
int lengthToCopy = Math.Min(count - bytesRead, _currentReadingBuffer.Length - _currentBufferPosition); | |
//Copy data from current buffer to output buffer | |
Buffer.BlockCopy(_currentReadingBuffer, _currentBufferPosition, buffer, outputBufferPosition, lengthToCopy); | |
//Update counter, position, length | |
_currentBufferPosition += lengthToCopy; | |
bytesRead += lengthToCopy; | |
_totalLengthToRead -= lengthToCopy; | |
outputBufferPosition += lengthToCopy; | |
} | |
_position += bytesRead; | |
return bytesRead; | |
} | |
public override long Seek(long offset, SeekOrigin origin) | |
{ | |
throw new NotSupportedException(); | |
} | |
public override void SetLength(long value) | |
{ | |
throw new NotSupportedException(); | |
} | |
public override void Write(byte[] buffer, int offset, int count) | |
{ | |
int currPos = offset; | |
byte[] arr; | |
while (currPos < count) | |
{ | |
if (INTERNAL_BUFFER_SIZE <= count - currPos) | |
{ | |
arr = bufferManager.TakeBuffer(INTERNAL_BUFFER_SIZE); | |
} | |
else | |
{ | |
arr = new byte[count - currPos]; | |
} | |
//Copy buffer | |
Buffer.BlockCopy(buffer, currPos, arr, 0, arr.Length); | |
//Enqueue | |
_bufferQueue.Enqueue(arr); | |
//Update counters | |
_totalLengthToRead += arr.Length; | |
currPos += arr.Length; | |
} | |
} | |
public void Dispose() | |
{ | |
if (_currentReadingBuffer != null && _currentReadingBuffer.Length == INTERNAL_BUFFER_SIZE) | |
{ | |
bufferManager.ReturnBuffer(_currentReadingBuffer); | |
_currentReadingBuffer = null; | |
} | |
if (_bufferQueue != null) | |
{ | |
while (_bufferQueue.Count > 0) | |
{ | |
bufferManager.ReturnBuffer(_bufferQueue.Dequeue()); | |
} | |
_bufferQueue = null; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment