Skip to content

Instantly share code, notes, and snippets.

@andrewfb
Created February 26, 2019 01:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save andrewfb/40059cbc65f8a1690c97bfa88936069f to your computer and use it in GitHub Desktop.
Save andrewfb/40059cbc65f8a1690c97bfa88936069f to your computer and use it in GitHub Desktop.
Implementation of cinder::StreamingBuffer
StreamingBuffer::StreamingBuffer( size_t blockSizeBytes )
: mBlockSize( std::max<size_t>( blockSizeBytes, 1 ) )
{
allocateNewWriteBlock();
mReadOffset = 0;
}
void StreamingBuffer::pushFront( const void *data, size_t dataSize )
{
std::lock_guard<std::mutex> lock( mMutex );
size_t offset = 0;
while( offset < dataSize ) {
size_t copyCount = std::min( dataSize - offset, mBlockSize - mWriteOffset );
memcpy( &mBlocks.back()[mWriteOffset], &reinterpret_cast<const uint8_t*>(data)[offset], copyCount );
offset += copyCount;
mWriteOffset += copyCount;
if( mWriteOffset == mBlockSize )
allocateNewWriteBlock();
}
}
size_t StreamingBuffer::popBack( void *output, size_t maxSize )
{
std::lock_guard<std::mutex> lock( mMutex );
size_t offset = 0;
maxSize = std::min( maxSize, calcSize() );
while( offset < maxSize && ( ! mBlocks.empty() ) ) {
// for the last block, we can only read up to `mWriteOffset`; for other blocks we can read up to 'mBlockSize'
size_t copyCount = std::min( maxSize - offset, ( ( mBlocks.size() == 1 ) ? mWriteOffset : mBlockSize ) - mReadOffset );
memcpy( &reinterpret_cast<uint8_t*>(output)[offset], &mBlocks.front()[mReadOffset], copyCount );
offset += copyCount;
mReadOffset += copyCount;
if( mReadOffset == mBlockSize )
releaseCurrentReadBlock();
}
return offset;
}
size_t StreamingBuffer::getSize() const
{
std::lock_guard<std::mutex> lock( mMutex );
return calcSize();
}
void StreamingBuffer::clear()
{
std::lock_guard<std::mutex> lock( mMutex );
while( mBlocks.size() > 1 )
releaseCurrentReadBlock();
mReadOffset = mWriteOffset = 0;
}
void StreamingBuffer::shrinkToFit()
{
std::lock_guard<std::mutex> lock( mMutex );
mUnusedBlocks.clear();
}
size_t StreamingBuffer::copyTo( void *output, size_t maxSize ) const
{
std::lock_guard<std::mutex> lock( mMutex );
size_t offset = 0, copyCount;
// the first buffer we copy from read offset, up to either 'mBlockSize' or in the case of a single block, 'mWriteOffset'
size_t firstBlockSize = ( mBlocks.size() == 1 ) ? mWriteOffset : mBlockSize;
copyCount = std::min( maxSize, firstBlockSize - mReadOffset );
memcpy( &reinterpret_cast<uint8_t*>(output)[0], &mBlocks.front()[mReadOffset], copyCount );
offset += copyCount;
if( maxSize == offset || mBlocks.size() == 1 )
return offset;
// all but the first and last buffer we can copy in its entirety
for( size_t block = 1; block < mBlocks.size() - 1; ++block ) {
copyCount = std::min( maxSize - offset, mBlockSize );
memcpy( &reinterpret_cast<uint8_t*>(output)[offset], &mBlocks[block][0], copyCount );
offset += copyCount;
if( maxSize == offset )
return offset;
}
// last block we should read up to 'writeOffset'
copyCount = std::min( maxSize - offset, mWriteOffset );
memcpy( &reinterpret_cast<uint8_t*>(output)[offset], &mBlocks.back()[0], copyCount );
offset += copyCount;
return offset;
}
// implicitly thread-safe; only called by thread-safe methods
void StreamingBuffer::allocateNewWriteBlock()
{
if( ! mUnusedBlocks.empty() ) {
mBlocks.push_back( std::move( mUnusedBlocks.back() ) );
mUnusedBlocks.pop_back();
}
else
mBlocks.emplace_back( new uint8_t[mBlockSize] );
mWriteOffset = 0;
}
// implicitly thread-safe; only called by thread-safe methods
void StreamingBuffer::releaseCurrentReadBlock()
{
mUnusedBlocks.push_back( std::move( mBlocks.front() ) );
mBlocks.pop_front();
mReadOffset = 0;
}
// implicitly thread-safe; only called by thread-safe methods
size_t StreamingBuffer::calcSize() const
{
if( mBlocks.size() == 1 )
return mWriteOffset - mReadOffset;
else {
size_t frontBlockSize = mBlockSize - mReadOffset;
size_t backBlockSize = mWriteOffset;
return ( mBlocks.size() - 2 ) * mBlockSize + frontBlockSize + backBlockSize;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment