Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
This gist is to showcase boost interprocess. Creating a set of classes to create an interface for me to create a shared memory layer for high level languages
template<size_t BufferSize>
class SharedMemoryConsumer
{
public:
explicit SharedMemoryConsumer(const std::string& memory_name)
: name_length_(memory_name.size()),
shm_(boost::interprocess::open_or_create,memory_name.c_str(),boost::interprocess::read_write)
{
shm_.truncate(sizeof(TraceQueue<BufferSize>));
region_ = boost::interprocess::mapped_region(shm_,boost::interprocess::read_write);
//Get the address of the mapped region
void * addr = region_.get_address();
//Obtain a pointer to the shared structure
data_ = static_cast<TraceQueue<BufferSize>*>(addr);
if (data_->name_length != name_length_)
{
data_ = new (addr)TraceQueue<BufferSize>;
data_->name_length = name_length_;
}
}
~SharedMemoryConsumer()
{
keep_listening_ = false;
background_listener_.join();
}
void start()
{
keep_listening_ = true;
background_listener_ = std::thread([=](){
do {
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(data_->mutex);
if (!data_->message_in) {
data_->cond_empty.wait(lock);
}
sig_(data_->message);
//Notify the other process that the buffer is empty
data_->message_in = false;
data_->cond_full.notify_one();
} while (!data_->closing_stream && keep_listening_);
});
}
void set_callable(const std::function<void(const InterprocessMessage<BufferSize>& memory_block)>& callback)
{
conn_ = sig_.connect([=](const InterprocessMessage<BufferSize>& message)
{
callback(message);
});
}
void wait()
{
background_listener_.join();
}
private:
int name_length_;
boost::signals2::signal<void(const InterprocessMessage<BufferSize>&)> sig_;
boost::signals2::connection conn_;
boost::interprocess::shared_memory_object shm_;
boost::interprocess::mapped_region region_;
std::thread background_listener_;
bool keep_listening_;
TraceQueue<BufferSize> * data_;
};
template <size_t BufferSize>
class SharedMemoryProducer
{
public:
explicit SharedMemoryProducer(const std::string& memory_name)
: name_length_(memory_name.size())
,remove_guard_(memory_name)
, shm_(boost::interprocess::open_or_create,memory_name.c_str(),boost::interprocess::read_write)
{
shm_.truncate(sizeof(TraceQueue<BufferSize>));
region_ = boost::interprocess::mapped_region(shm_,boost::interprocess::read_write);
//Get the address of the mapped region
void * addr = region_.get_address();
//Construct the shared structure in memory
data_ = static_cast<TraceQueue<BufferSize>*>(addr);
if (data_->name_length != name_length_)
{
data_ = new (addr)TraceQueue<BufferSize>;
data_->name_length = name_length_;
}
}
~SharedMemoryProducer()
{
data_->name_length = 0;
data_->closing_stream = true;
}
void write_data(const std::string& buffer)
{
if (buffer.size() > BufferSize)
{
throw std::invalid_argument("Message size is larger than buffer size");
}
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(data_->mutex);
if(data_->message_in){
data_->cond_full.wait(lock);
}
memcpy(data_->message.message,buffer.data(),buffer.size());
data_->message.message_length = buffer.size();
//Notify to the other process that there is a message
data_->cond_empty.notify_one();
//Mark message buffer as full
data_->message_in = true;
}
private:
int name_length_;
shm_remove remove_guard_;
boost::interprocess::shared_memory_object shm_;
boost::interprocess::mapped_region region_;
TraceQueue<BufferSize> * data_;
};
#include <boost/interprocess/shared_memory_object.hpp>
struct shm_remove
{
public:
explicit shm_remove(const std::string& shm_name)
: shm_name_(shm_name)
{
// boost::interprocess::shared_memory_object::remove(shm_name.c_str());
}
~shm_remove(){ boost::interprocess::shared_memory_object::remove(shm_name_.c_str()); }
private:
std::string shm_name_;
};
template <size_t MESSAGE_SIZE>
struct InterprocessMessage
{
//Items to fill
char message[MESSAGE_SIZE];
//Message size
size_t message_length;
std::string to_string() const
{
return std::string(message,message_length);
}
};
template <size_t MESSAGE_SIZE>
struct TraceQueue
{
TraceQueue()
: message_in(false)
{}
//Mutex to protect access to the queue
boost::interprocess::interprocess_mutex mutex;
//Condition to wait when the queue is empty
boost::interprocess::interprocess_condition cond_empty;
//Condition to wait when the queue is full
boost::interprocess::interprocess_condition cond_full;
//Message
InterprocessMessage<MESSAGE_SIZE> message;
//Is there any message
bool message_in;
//Notify closing shop
bool closing_stream = false;
//Check to see if memory has been created
int name_length;
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment