Skip to content

Instantly share code, notes, and snippets.

@dspezia
Created February 9, 2012 16:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dspezia/1781031 to your computer and use it in GitHub Desktop.
Save dspezia/1781031 to your computer and use it in GitHub Desktop.
Example of double queue pattern
/***************************************************************************
* Copyright (C) 2008 by Amadeus *
* *
* This program is free software; you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation; either version 2 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the *
* Free Software Foundation, Inc., *
* 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
***************************************************************************/
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdexcept>
#include "Sequencer.h"
namespace obesim
{
using namespace std;
/****************************************************************************/
Queue::~Queue()
{
deque<Buffer *>::iterator i;
for ( i = _alt.begin(); i != _alt.end(); ++i )
Buffer::release( *i );
{
Guard lock( _mutex );
for ( i = _main.begin(); i != _main.end(); ++i )
Buffer::release( *i );
}
}
/****************************************************************************/
bool Queue::empty() const
{
if ( ! _alt.empty() )
return false;
{
Guard lock( _mutex );
return _main.empty();
}
}
/****************************************************************************/
void Queue::push( Buffer *b )
{
Guard lock( _mutex );
_main.push_back( b );
_cond.signal();
}
/****************************************************************************/
Buffer *Queue::pop()
{
if ( _alt.empty() )
{
Guard lock( _mutex );
while ( _main.empty() )
_cond.wait();
_main.swap( _alt );
}
Buffer *b = _alt.front();
_alt.pop_front();
return b;
}
/****************************************************************************/
Sequencer::Sequencer( const char *name )
: _name( name ) {}
/****************************************************************************/
Sequencer::~Sequencer() {}
/****************************************************************************/
void Sequencer::run()
{
log( LOG_INFO, "Starting sequencer %s", _name );
Buffer *b = 0;
while ( (b=_queue.pop()) != 0 )
try
{
processMessage( b );
}
catch( std::exception &e )
{
log( LOG_ERR, "EXCEPTION: %s", e.what() );
}
log( LOG_INFO, "End of sequencer %s", _name );
}
/****************************************************************************/
SequencerLog::SequencerLog( const char *fname, bool debugFlag )
: Sequencer("TRACER"),
_file(0),
_baseTime( 0 ),
_level( debugFlag ? LOG_DBG : LOG_INFO )
{
_file = fopen( fname, "w" );
if ( !_file )
throw runtime_error( "SequencerLog - open file" );
}
/****************************************************************************/
SequencerLog::~SequencerLog()
{
fclose(_file);
}
/****************************************************************************/
void SequencerLog::processMessage( Buffer *b )
{
fwrite( b->getBuf(), b->getSize(), 1, _file );
fputc( '\n', _file );
Buffer::release( b );
time_t t = time(0);
if ( t != _baseTime )
{
fflush( _file );
_baseTime = t;
}
}
/****************************************************************************/
}
/***************************************************************************
* Copyright (C) 2008 by Amadeus *
* *
* This program is free software; you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation; either version 2 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the *
* Free Software Foundation, Inc., *
* 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
***************************************************************************/
#ifndef __OBESIM_SEQUENCER__
#define __OBESIM_SEQUENCER__
#include "Buffer.h"
#include "Thread.h"
namespace obesim
{
/****************************************************************************/
class Queue
{
public:
Queue() : _cond( _mutex ) {}
~Queue();
bool empty() const;
void push( Buffer *b );
Buffer *pop();
private:
Queue( const Queue &other );
Queue &operator=( const Queue &other );
std::deque<Buffer *> _main, _alt;
mutable Mutex _mutex;
Condition _cond;
};
/****************************************************************************/
class Sequencer : public Thread
{
public:
Sequencer( const char *name );
~Sequencer();
void push( Buffer *b = 0 )
{
_queue.push( b );
}
virtual void run();
virtual void processMessage( Buffer *b ) = 0;
private:
Queue _queue;
const char *_name;
};
/****************************************************************************/
class SequencerLog : public Sequencer
{
public:
SequencerLog( const char *fname, bool debugFlag );
~SequencerLog();
virtual void processMessage( Buffer *b );
int getLevel() const
{
return _level;
}
private:
FILE *_file;
time_t _baseTime;
int _level;
};
/****************************************************************************/
}
#endif
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment