Created
April 16, 2012 07:10
-
-
Save biot023/2396866 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* Compile with: | |
* $CXX -std=gnu++11 -pthread -Wall -g -D_GLIBCXX_USE_NANOSLEEP -I/usr/local/include queuetest.cpp -o queuetest | |
* | |
* Where $CXX points to a G++ compiler, version 4.7 | |
*/ | |
#include <memory> | |
#include <thread> | |
#include <queue> | |
#include <mutex> | |
#include <condition_variable> | |
#include <chrono> | |
#include <cassert> | |
#include <iostream> | |
using namespace std; | |
typedef unsigned int uint; | |
// ---------------------------------------- | |
template <typename T> | |
struct IQueueWriter | |
{ | |
virtual ~IQueueWriter() {} | |
virtual void push( shared_ptr<T> ) = 0; | |
}; | |
// ---------------------------------------- | |
template <typename T> | |
struct IQueueReader | |
{ | |
virtual ~IQueueReader() {} | |
virtual const bool empty() const = 0; | |
virtual const size_t size() const = 0; | |
virtual shared_ptr<T> pop() = 0; | |
}; | |
// ---------------------------------------- | |
/* Blocking, concurrency-safe queue. The method that blocks is pop(), | |
* which makes the current thread wait until there is a value to pop | |
* from the queue. | |
*/ | |
template <typename T> | |
struct ConcurrentQueue : public IQueueWriter<T>, public IQueueReader<T> | |
{ | |
ConcurrentQueue() {} | |
ConcurrentQueue( const ConcurrentQueue & ) = delete; | |
ConcurrentQueue & operator= ( const ConcurrentQueue & ) = delete; | |
/* Concurrency-safe push to queue. | |
*/ | |
virtual void push( shared_ptr<T> val ) | |
{ | |
lock_guard<mutex> lk( _mutex ); | |
_queue.push( val ); | |
_cvar.notify_one(); | |
} | |
/* Concurrency-safe check if queue empty. | |
*/ | |
virtual const bool empty() const | |
{ | |
lock_guard<mutex> lk( _mutex ); | |
bool result( _queue.empty() ); | |
return result; | |
} | |
virtual const size_t size() const | |
{ | |
lock_guard<mutex> lk( _mutex ); | |
return _queue.size(); | |
} | |
/* Waiting, concurrency-safe pop of value. If there are no values in | |
* the queue, then this method blocks the current thread until there | |
* are. | |
*/ | |
virtual shared_ptr<T> pop() | |
{ | |
unique_lock<mutex> lk( _mutex ); | |
_cvar.wait( lk, [ this ] { return ! _queue.empty(); } ); | |
auto value( _queue.front() ); | |
_queue.pop(); | |
return value; | |
} | |
private: | |
mutable mutex _mutex; | |
queue<shared_ptr<T>> _queue; | |
condition_variable _cvar; | |
}; | |
// ---------------------------------------- | |
void clear_q( ConcurrentQueue<string> &q ) | |
{ | |
while ( ! q.empty() ) q.pop(); | |
assert( q.empty() ); | |
} | |
int main( const int argc, const char *argv[] ) { | |
ConcurrentQueue<string> q; | |
{ // Should be able to push values onto the queue | |
q.push( make_shared<string>( "Dennis The Menace" ) ); | |
} | |
clear_q( q ); | |
{ // Should be able to pop values from the queue | |
q.push( make_shared<string>( "Frumpy Towers" ) ); | |
assert( *q.pop() == "Frumpy Towers" ); | |
} | |
clear_q( q ); | |
{ // Should be able to test if queue is empty | |
assert( q.empty() ); | |
q.push( make_shared<string>( "Donkey Balls" ) ); | |
assert( ! q.empty() ); | |
q.pop(); | |
assert( q.empty() ); | |
} | |
clear_q( q ); | |
{ // Should be able to concurrently push values | |
thread t1( [ & ] { | |
for ( uint i( 0 ) ; i < 5000 ; ++ i ) | |
q.push( make_shared<string>( "Sherlock Holmes" ) ); | |
} ); | |
thread t2( [ & ] { | |
for ( uint i( 0 ) ; i < 5000 ; ++ i ) | |
q.push( make_shared<string>( "Professor Moriarty" ) ); | |
} ); | |
t1.join(); | |
t2.join(); | |
assert( q.size() == static_cast<uint>( 10000 ) ); | |
pair<uint, uint> counts( 0, 0 ); | |
for ( uint i( 0 ) ; i < 10000 ; ++ i ) { | |
string str( *q.pop() ); | |
if ( str == "Sherlock Holmes" ) | |
++ counts.first; | |
else if ( str == "Professor Moriarty" ) | |
++ counts.second; | |
else | |
throw( runtime_error( "Unexpected value in queue" ) ); | |
} | |
assert( counts.first == static_cast<uint>( 5000 ) ); | |
assert( counts.second == counts.first ); | |
} | |
clear_q( q ); | |
{ // Should be able to concurrently pop values | |
for ( uint i( 0 ) ; i < 100 ; ++ i ) | |
q.push( make_shared<string>( "Monty Halfwit" ) ); | |
pair<uint, uint> counts( 0, 0 ); | |
thread t1( [ & ] { | |
while ( true ) { | |
this_thread::sleep_for( chrono::milliseconds( 1 ) ); | |
q.pop(); | |
++ counts.first; | |
} | |
} ); | |
thread t2( [ & ] { | |
while ( true ) { | |
this_thread::sleep_for( chrono::milliseconds( 1 ) ); | |
q.pop(); | |
++ counts.second; | |
} | |
} ); | |
t1.detach(); | |
t2.detach(); | |
while ( counts.first + counts.second < 100 ) | |
this_thread::sleep_for( chrono::milliseconds( 100 ) ); | |
assert( counts.first + counts.second == static_cast<uint>( 100 ) ); | |
assert( counts.first != static_cast<uint>( 0 ) ); | |
assert( counts.second != static_cast<uint>( 0 ) );; | |
} | |
clear_q( q ); | |
{ // Should have threads wait until there is a value to pop | |
string str( "ORIG" ); | |
thread t( [ & ] { cout << "pop() ..." << endl; str = *q.pop(); } ); | |
for ( uint i( 0 ) ; i < 10 ; ++ i ) { | |
this_thread::sleep_for( chrono::milliseconds( 10 ) ); | |
assert( str == "ORIG" ); | |
} | |
cout << "push() ..." << endl; | |
q.push( make_shared<string>( "Tossy Halfcakes" ) ); | |
cout << "Size: " << q.size() << endl; | |
t.join(); | |
assert( str == "Tossy Halfcakes" ); | |
} | |
clear_q( q ); | |
// ---------- | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment