Skip to content

Instantly share code, notes, and snippets.

@biot023
Created April 16, 2012 07:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save biot023/2396866 to your computer and use it in GitHub Desktop.
Save biot023/2396866 to your computer and use it in GitHub Desktop.
/* Compile with:
* $CXX -std=gnu++11 -pthread -Wall -g -D_GLIBCXX_USE_NANOSLEEP -I/usr/local/include queuetest.cpp -o queuetest
*
* Where $CXX points to a G++ compiler, version 4.7
*/
#include <memory>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <cassert>
#include <iostream>
using namespace std;
typedef unsigned int uint;
// ----------------------------------------
template <typename T>
struct IQueueWriter
{
virtual ~IQueueWriter() {}
virtual void push( shared_ptr<T> ) = 0;
};
// ----------------------------------------
template <typename T>
struct IQueueReader
{
virtual ~IQueueReader() {}
virtual const bool empty() const = 0;
virtual const size_t size() const = 0;
virtual shared_ptr<T> pop() = 0;
};
// ----------------------------------------
/* Blocking, concurrency-safe queue. The method that blocks is pop(),
* which makes the current thread wait until there is a value to pop
* from the queue.
*/
template <typename T>
struct ConcurrentQueue : public IQueueWriter<T>, public IQueueReader<T>
{
ConcurrentQueue() {}
ConcurrentQueue( const ConcurrentQueue & ) = delete;
ConcurrentQueue & operator= ( const ConcurrentQueue & ) = delete;
/* Concurrency-safe push to queue.
*/
virtual void push( shared_ptr<T> val )
{
lock_guard<mutex> lk( _mutex );
_queue.push( val );
_cvar.notify_one();
}
/* Concurrency-safe check if queue empty.
*/
virtual const bool empty() const
{
lock_guard<mutex> lk( _mutex );
bool result( _queue.empty() );
return result;
}
virtual const size_t size() const
{
lock_guard<mutex> lk( _mutex );
return _queue.size();
}
/* Waiting, concurrency-safe pop of value. If there are no values in
* the queue, then this method blocks the current thread until there
* are.
*/
virtual shared_ptr<T> pop()
{
unique_lock<mutex> lk( _mutex );
_cvar.wait( lk, [ this ] { return ! _queue.empty(); } );
auto value( _queue.front() );
_queue.pop();
return value;
}
private:
mutable mutex _mutex;
queue<shared_ptr<T>> _queue;
condition_variable _cvar;
};
// ----------------------------------------
void clear_q( ConcurrentQueue<string> &q )
{
while ( ! q.empty() ) q.pop();
assert( q.empty() );
}
int main( const int argc, const char *argv[] ) {
ConcurrentQueue<string> q;
{ // Should be able to push values onto the queue
q.push( make_shared<string>( "Dennis The Menace" ) );
}
clear_q( q );
{ // Should be able to pop values from the queue
q.push( make_shared<string>( "Frumpy Towers" ) );
assert( *q.pop() == "Frumpy Towers" );
}
clear_q( q );
{ // Should be able to test if queue is empty
assert( q.empty() );
q.push( make_shared<string>( "Donkey Balls" ) );
assert( ! q.empty() );
q.pop();
assert( q.empty() );
}
clear_q( q );
{ // Should be able to concurrently push values
thread t1( [ & ] {
for ( uint i( 0 ) ; i < 5000 ; ++ i )
q.push( make_shared<string>( "Sherlock Holmes" ) );
} );
thread t2( [ & ] {
for ( uint i( 0 ) ; i < 5000 ; ++ i )
q.push( make_shared<string>( "Professor Moriarty" ) );
} );
t1.join();
t2.join();
assert( q.size() == static_cast<uint>( 10000 ) );
pair<uint, uint> counts( 0, 0 );
for ( uint i( 0 ) ; i < 10000 ; ++ i ) {
string str( *q.pop() );
if ( str == "Sherlock Holmes" )
++ counts.first;
else if ( str == "Professor Moriarty" )
++ counts.second;
else
throw( runtime_error( "Unexpected value in queue" ) );
}
assert( counts.first == static_cast<uint>( 5000 ) );
assert( counts.second == counts.first );
}
clear_q( q );
{ // Should be able to concurrently pop values
for ( uint i( 0 ) ; i < 100 ; ++ i )
q.push( make_shared<string>( "Monty Halfwit" ) );
pair<uint, uint> counts( 0, 0 );
thread t1( [ & ] {
while ( true ) {
this_thread::sleep_for( chrono::milliseconds( 1 ) );
q.pop();
++ counts.first;
}
} );
thread t2( [ & ] {
while ( true ) {
this_thread::sleep_for( chrono::milliseconds( 1 ) );
q.pop();
++ counts.second;
}
} );
t1.detach();
t2.detach();
while ( counts.first + counts.second < 100 )
this_thread::sleep_for( chrono::milliseconds( 100 ) );
assert( counts.first + counts.second == static_cast<uint>( 100 ) );
assert( counts.first != static_cast<uint>( 0 ) );
assert( counts.second != static_cast<uint>( 0 ) );;
}
clear_q( q );
{ // Should have threads wait until there is a value to pop
string str( "ORIG" );
thread t( [ & ] { cout << "pop() ..." << endl; str = *q.pop(); } );
for ( uint i( 0 ) ; i < 10 ; ++ i ) {
this_thread::sleep_for( chrono::milliseconds( 10 ) );
assert( str == "ORIG" );
}
cout << "push() ..." << endl;
q.push( make_shared<string>( "Tossy Halfcakes" ) );
cout << "Size: " << q.size() << endl;
t.join();
assert( str == "Tossy Halfcakes" );
}
clear_q( q );
// ----------
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment