Skip to content

Instantly share code, notes, and snippets.

@guru-florida
Last active March 10, 2021 20:30
Show Gist options
  • Save guru-florida/7f8c94e905731b122aac5a995e857406 to your computer and use it in GitHub Desktop.
Save guru-florida/7f8c94e905731b122aac5a995e857406 to your computer and use it in GitHub Desktop.
Read/Write Memory Barriers for RealTime processing in Ros2

Work in Progress Looking for feedback on the design and code.

Read/Write Realtime Hardware buffers

From what I can tell, there are some issues with using realtime_tools::RealtimeBuffer in our ros2_control hardware interfaces. It seems this RealtimeBuffer is only useful for writing to a realtime based on the use of the new_data_available internal variable. For reading state from the realtime thread we dont have an existing RealtimeBuffer.

The internal implementation of the "swapping double buffer pointers" works for both modes, only the interface (read vs write) to this swapping buffer needs to change. Therefor I refactored the swapping double buffers out into the MemoryBarrier class, which also defines a sub-type called DirectAccess<> to protect direct access to the buffers.

The realtime thread is expected to use the DirectAccess class to get direct access to the buffers without memory copies. However, the non-realtime thread should use either WriteBarrier or ReadBarrier. These two classes control the direction of data flow to/from the realtime thread, respectively. IMO It makes sense that the reads and writes be through seperate class objects and not via some bi-directional interface.

TL;DR - skip to the examples at the bottom of this README. The usage is simple.

Class Structure

The following descriptions are the brief overview. There are more methods and fields that you can dig into in source but not important to the story.

MemoryBarrier

MemoryBarrier implements a double buffer swapping mechanism for interchanging data between threads. It allocates two T objects on the heap. When MemoryBarrier::swap() is called the two pointers are swapped. Generally one pointer is accessed from the non-realtime thread and the other is used by the realtime thread.

You should not use this class directly, you will work with ReadBarrier, WriteBarrier and DirectAccess classes only.

Methods: bool new_data_available() - read only. returns true if flag indicates new data available. initialize(T value) - initializes both sides of the memory buffer to the same value. memory() - access the MemoryBarrier object

Protected Fields: T* nrt_ - the non-RealTime buffer T* rt_ - the RealTime buffer bool new_data_available_ - flag indicating if new data is ready (context depends on direction)

ReadBarrier<T, lock_strategy = wait_lock>

ReadBarrier implements reading data from a realtime thread using a MemoryBarrier. The default constructor will create a new memory barrier for use. There is an alternate constructor if you want to create your own MemoryBarrier explicitly.

Methods: pull(T& val) - swap RT buffer for non-RT, copy the new data into val and reset the new_data_available flag. current(T& val) - copy the current data into val. No swap is performed and the new_data_available flag is unaffected. memory() - access the MemoryBarrier object

WriteBarrier<T, lock_strategy = wait_lock>

WriteBarrier implements writing data to a realtime thread from a non-realtime thread using a MemoryBarrier. The default constructor will create a new memory barrier for use. There is an alternate constructor if you want to create your own MemoryBarrier explicitly.

Methods: push(T& val) - swap RT buffer for non-RT, copy the new data from val into non-RT buffer and set the new_data_available flag. current(T& val) - copy the current data into val. No swap is performed and the new_data_available flag is unaffected. memory() - access the MemoryBarrier object

MemoryBarrier:: DirectAccess<access_mode, lock_strategy>

MemoryBarrier class gives no access to the buffers itself, all read/write calls to the buffers are done by creating a DirectAccess<> object that protects access to the buffers through a mutex with behaviour defined by the lock_strategy. The DirectAccess class is a simply object and should be allocated on the stack not heap so as to prevent calls to malloc().

The ReadBarrier and WriteBarrier classes also use a DirectAccess object to access the MemoryBarrier but only long enough to perform the non-realtime memory copy.

Parameters: access_mode - either realtime or non_realtime. lock_strategy - can be try_lock or wait_lock. If not specified defaults to try_lock when access_mode=realtime and wait_lock when non_realtime.

Methods: swap() - rotate the double buffer pointers. non-RT and RT buffer is swapped. new_data_available(bool) - sets the flag indicating if new data is available to the destination thread...whether that be to the RT thread (Write) or to the non-RT (Read) side. reset() - detaches from a MemoryBarrier, unlocking it immediately. The destructor also does this for you.

Smart_ptr semantics: DirectAccess also implements all the *, ->, and get() methods like smart_ptr for dereferencing the memory buffer. These are all inline so you can access the underlying memory efficiently.

Other types

Some types are used to configure the templates above. Good defaults are already chosen based on RT/non-RT access modes so you should not have to bother with these. They are:

try_lock / wait_lock

Determine what kind of locking strategy should be performed when accessing the memory barrier. wait_lock will wait indefinately for the lock to free up whereas try_lock will only try once and fail if MemoryBarrier is currently locked.

realtime / non_realtime

Indicates an access mode. Can be used to determine the proper locking_strategy or to indicate if you want DirectAccess to the RT or non-RT memory buffer.

Notes:

  • I chose pull() and push() names because I felt read/write doesnt clearly indicate if a swap is performed, where push/pull for me is more clear. For example, ReadBarrier::read() did swap yet WriteBarrier::read() did not. Instead, WriteBarrier::read became current() and ReadBarrier::read became pull() and I feel this is less confusing.
  • There is also a polarity field in MemoryBarrier, this is currently unused and will probably go.
  • Should this not be called Barriers? "memory barrier" is also used elsewhere in programming.

Example definition of bi-directional Realtime data passing

In your class header file declare a read and write barrier.

	// Barrier is placed in realtime_tools namespace, hopefully it will end up there.
	using realtime_tools;

	typedef struct { ... } StateData;
	typedef struct { ... } CommandData;

	// you might want to declare data structs here if you are binding to handles.
	// for this example, we'll keep it simple and declare inside the method.
	// StateData state;
	// CommandData command;
	
	// declare a two-way data link to our realtime thread
        ReadBarrier<StateData> state_rtb;
        WriteBarrier<CommandData> command_rtb;

In your non-RealTime read() method:

    void read()
    {
	StateData state;

    	// read data from the realtime thread if there is new data
    	// will cause a swap of the internal buffers
        if(state_rtb.pull(state_))
        	LOG("new data read");
    }

In your non-RealTime write() method:

    void write()
    {
    	StateData command;
	
        // you might need to get the current values first, you can
        // read values using current(T&) without affecting the MemoryBarrier.
	// command_rtb.current(command);
	    
	// set our new command
	command.position = 15;
	command.velocity = 2.3;
	    
    	// write data to our realtime thread and set the "new data" flag
    	// will cause a swap of the internal buffers
        if(command_rtb.push(command))
        	LOG("data successfully written");
	else
		LOG("write failed, possibly a locking failure?");
    }

In your RealTime thread method:

    void process_realtime()
    {
    	// write state to non-RT
	decltype(state_rtb)::DirectAccessType state(command_rtb);
	//   typeof(state) is MemoryBarrier<StateData>::DirectAccess<realtime, try_lock>
	state->position = 8;		// using smart_ptr-like semantics
	state.new_data_available(true); // indicate to non-RT there is new state data
	state.reset();			// unlock the state barrier
	
	// read new commands from non-RT
	decltype(command_rtb)::DirectAccessType command(command_rtb);
	//   typeof(command) is MemoryBarrier<CommandData>::DirectAccess<realtime, try_lock>
	if(command.new_data_available()) {
		// todo: write command->position and command->velocity to hardware	
		command.new_data_available(false);	// indicate we read the commands
	}
	command.reset();	// redundant, would fall out of scope and reset anyway
    }
//
// Created by guru on 2/22/21.
//
#ifndef LSS_HARDWARE_REALTIME_STATIC_BUFFER_H
#define LSS_HARDWARE_REALTIME_STATIC_BUFFER_H
#include <realtime_tools/realtime_buffer.h>
namespace realtime_tools {
class try_lock : public std::unique_lock<std::mutex>
{
public:
explicit try_lock(std::mutex& m)
: std::unique_lock<std::mutex>(m, std::try_to_lock)
{
}
};
class wait_lock : public std::unique_lock<std::mutex>
{
public:
explicit wait_lock(std::mutex& m)
: std::unique_lock<std::mutex>(m, std::defer_lock)
{
// will wait some time
while (!try_lock()) {
std::this_thread::sleep_for(std::chrono::microseconds(500));
}
}
};
struct realtime {
using lock_strategy = try_lock;
};
struct non_realtime {
using lock_strategy = wait_lock;
};
// for RT input, NRT writes but doesnt swap, RT swaps and reads
// for RT output, RT writes, NRT swaps and reads at will
template<class T>
class MemoryBarrier
{
public:
template<typename access_mode = realtime, typename lock_strategy = typename access_mode::lock_strategy>
class DirectAccess : public lock_strategy
{
public:
using MemoryBarrierType = MemoryBarrier<T>;
explicit DirectAccess(MemoryBarrierType& mem_barrier) noexcept
: lock_strategy(mem_barrier.mutex_), mem_(&mem_barrier), obj_(nullptr)
{
obj_ = _get(); // will return nullptr if we dont own the lock yet
}
template<class X>
explicit DirectAccess(X& mem_barrier) noexcept
: lock_strategy(mem_barrier.memory().mutex_), mem_(mem_barrier.memory()), obj_(nullptr)
{
obj_ = _get(); // will return nullptr if we dont own the lock yet
}
template<class X>
explicit DirectAccess(X* mem_barrier) noexcept
: lock_strategy(mem_barrier->memory().mutex_), mem_(mem_barrier->memory()), obj_(nullptr)
{
obj_ = _get(); // will return nullptr if we dont own the lock yet
}
// do not allow copying, only move semantics
DirectAccess(const DirectAccess&) = delete;
DirectAccess& operator=(const DirectAccess&) = delete;
DirectAccess(DirectAccess&& move) noexcept
: mem_(move.mem_), obj_(move.obj_)
{
move.mem_ = nullptr;
}
DirectAccess& operator=(DirectAccess&& move) noexcept
{
mem_ = move.mem_;
move.mem_ = nullptr;
obj_ = move.obj_;
return *this;
}
inline void reset() {
if (lock_strategy::owns_lock())
lock_strategy::unlock();
mem_ = nullptr;
obj_ = nullptr;
}
inline bool new_data_available() const {
return mem_->new_data_available();
}
inline void new_data_available(bool avail) {
if (!lock_strategy::owns_lock())
throw std::runtime_error("can't modify an unlocked MemoryBarrier");
mem_->new_data_available_ = avail;
}
inline void swap() {
if (!std::unique_lock<std::mutex>::owns_lock())
throw std::runtime_error("can't swap an unlocked MemoryBarrier");
mem_->swap();
obj_ = _get();
}
inline explicit operator bool() const { return mem_ != nullptr && get(); }
inline bool operator==(std::nullptr_t) const { return mem_ == nullptr && get(); }
inline bool operator!=(std::nullptr_t) const { return mem_ != nullptr && get(); }
inline T* get() { return _get(); }
inline const T* get() const { return _get(); }
inline T* operator->() { return get(); }
inline const T* operator->() const { return get(); }
inline T& operator*() { return *get(); }
inline const T& operator*() const { return *get(); }
private:
MemoryBarrierType* mem_;
mutable T* obj_;
template<class realtimeness = access_mode>
inline typename std::enable_if_t<std::is_trivially_assignable<realtime, realtimeness>::value, T*>
_get() {
return obj_
? obj_
: lock_strategy::owns_lock()
? obj_ = mem_->rt_
: nullptr;
}
template<class realtimeness = access_mode>
inline typename std::enable_if_t<std::is_trivially_assignable<non_realtime, realtimeness>::value, T*>
_get() {
return obj_
? obj_
: lock_strategy::owns_lock()
? obj_ = mem_->nrt_
: nullptr;
}
template<class realtimeness = access_mode>
inline typename std::enable_if_t<std::is_trivially_assignable<realtime, realtimeness>::value, const T*>
_get() const {
return obj_
? obj_
: lock_strategy::owns_lock()
? obj_ = mem_->rt_
: nullptr;
}
template<class realtimeness = access_mode>
inline typename std::enable_if_t<std::is_trivially_assignable<non_realtime, realtimeness>::value, const T*>
_get() const {
return obj_
? obj_
: lock_strategy::owns_lock()
? obj_ = mem_->nrt_
: nullptr;
}
};
MemoryBarrier()
: nrt_(new T()), rt_(new T()), polarity_(false), new_data_available_(false)
{
}
explicit MemoryBarrier(const T& data)
: nrt_(new T(data)), rt_(new T(data)), polarity_(false), new_data_available_(false)
{
}
explicit MemoryBarrier(MemoryBarrier<T>&& move) noexcept
: nrt_(move.nrt_), rt_(move.rt_), polarity_(move.polarity_), new_data_available_(move.new_data_available_)
{
move.nrt_ = nullptr;
move.rt_ = nullptr;
move.polarity_ = false;
move.new_data_available_ = false;
}
/*MemoryBarrier(const MemoryBarrier<T, read_locking_strategy, write_locking_strategy>& copy)
: in_(new T(*copy.in_)), out_(new T(*copy.out_)), polarity_(copy.polarity_), new_data_available_(copy.new_data_available_) {} }*/
// allow moving but not copying
MemoryBarrier(const MemoryBarrier<T>&) = delete;
MemoryBarrier& operator=(const MemoryBarrier<T>&) = delete;
// todo: perhaps we need a new_data on both nrt and rt side
// true if new data is available, depends on if the memory is being used for Read from RT or Write to RT mode
inline bool new_data_available() const { return new_data_available_; }
void initialize(const T& value) {
wait_lock guard(mutex_);
if(guard.owns_lock()) {
*nrt_ = *rt_ = value;
} else
throw std::runtime_error("request to initialize realtime barrier failed trying to lock");
}
protected:
T * nrt_;
T * rt_;
bool polarity_; // flipped each time barrier rotates (swaps)
bool new_data_available_;
// Set as mutable so that readFromNonRT() can be performed on a const buffer
mutable std::mutex mutex_;
inline void swap() {
// swap pointers
T *tmp = nrt_;
nrt_ = rt_;
rt_ = tmp;
polarity_ = !polarity_;
}
friend class DirectAccess<realtime>;
friend class DirectAccess<non_realtime>;
};
template<class T, class locking_strategy = wait_lock>
class ReadBarrier
{
public:
using MemoryBarrierType = MemoryBarrier<T>;
//using DirectAccessType = typename T::DirectAccess<>;
ReadBarrier() noexcept
: mem_(new MemoryBarrierType()), owns_mem(true)
{
}
explicit ReadBarrier(MemoryBarrierType& mem_barrier) noexcept
: mem_(&mem_barrier), owns_mem(false)
{
}
virtual ~ReadBarrier() {
if(owns_mem)
delete mem_;
}
//inline operator MemoryBarrier&() { return *mem_; }
inline MemoryBarrierType& memory() { return *mem_; }
bool current(T& dest)
{
typename MemoryBarrierType::template DirectAccess<non_realtime, locking_strategy> direct(*mem_);
if(direct) {
dest = *direct;
return true;
} else
return false;
}
// todo: since this read also swaps, should it be called read_and_swap() or pop() or pull()
bool pull(T& dest)
{
typename MemoryBarrierType::template DirectAccess<non_realtime, locking_strategy> direct(*mem_);
if(direct && mem_->new_data_available()) {
direct.swap();
direct.new_data_available(false);
dest = *direct;
return true;
} else
return false; // failed to read
}
private:
MemoryBarrierType* mem_;
bool owns_mem;
};
template<class T, class locking_strategy = wait_lock>
class WriteBarrier
{
public:
using MemoryBarrierType = MemoryBarrier<T>;
//using DirectAccessType = DirectAccess<T>;
WriteBarrier() noexcept
: mem_(new MemoryBarrierType())
{
}
WriteBarrier(MemoryBarrierType& mem_barrier) noexcept
: mem_(&mem_barrier)
{
}
inline MemoryBarrierType& memory() { return *mem_; }
bool current(T& dest)
{
typename MemoryBarrierType::template DirectAccess<non_realtime, locking_strategy> direct(*mem_);
if(direct) {
dest = *direct;
return true;
} else
return false;
}
bool push(const T& data)
{
typename MemoryBarrierType::template DirectAccess<non_realtime, locking_strategy> direct(*mem_);
if(direct) {
*direct = data;
direct.new_data_available(true);
return true;
} else
return false;
}
private:
MemoryBarrierType* mem_;
};
#endif //LSS_HARDWARE_REALTIME_STATIC_BUFFER_H
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment