Skip to content

Instantly share code, notes, and snippets.

@p4checo
Last active December 16, 2015 19:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save p4checo/5485036 to your computer and use it in GitHub Desktop.
Save p4checo/5485036 to your computer and use it in GitHub Desktop.
TripleBufferBenchmark
Example run:
Producer at 120Hz
Consumer at 60Hz
TripleBufferQueue (20 runs, 10 seconds each)
# Produced Consumed +-Delta Total
1 496 494 2 990
2 589 588 1 1177
3 599 597 2 1196
4 597 595 2 1192
5 599 598 1 1197
6 592 590 2 1182
7 590 588 2 1178
8 595 593 2 1188
9 598 596 2 1194
10 582 580 2 1162
11 600 599 1 1199
12 573 572 1 1145
13 599 597 2 1196
14 590 588 2 1178
15 600 599 1 1199
16 599 598 1 1197
17 600 599 1 1199
18 593 592 1 1185
19 600 598 2 1198
20 600 599 1 1199
--- -------- -------- -------- --------
Avg 589.55 588.00 1.55 1177.55
TripleBuffer (20 runs, 10 seconds each)
# Produced Consumed +-Delta Total
1 1200 600 600 1800
2 1200 599 601 1799
3 1200 599 601 1799
4 1200 599 601 1799
5 1200 599 601 1799
6 1200 599 601 1799
7 1200 600 600 1800
8 1200 600 600 1800
9 1200 600 600 1800
10 1200 600 600 1800
11 1199 600 599 1799
12 1200 599 601 1799
13 1200 599 601 1799
14 1200 600 600 1800
15 1200 600 600 1800
16 1200 599 601 1799
17 1200 600 600 1800
18 1199 600 599 1799
19 1200 599 601 1799
20 1200 599 601 1799
--- -------- -------- -------- --------
Avg 1199.90 599.50 600.40 1799.40
//
// main.cpp
// TripleBufferBenchMark
//
#include "TripleBuffer.hxx"
#include "TripleBufferQueue.hxx"
#include <thread>
#include <iostream>
#include <vector>
#include <cmath>
typedef std::chrono::high_resolution_clock Clock;
typedef std::chrono::nanoseconds nanoseconds;
enum ThreadType{
PRODUCER = 0,
CONSUMER
};
//--------------------------
// Configuration
static const int RUN_SECONDS = 1;
static const int NUM_RUNS = 20;
static const bool FREE_LOOP_MODE = false;
static const long NANOS_IN_SEC = 1000000000L;
static const int PRODUCER_RATE = 120; // number of producer writes per second
static const long PRODUCER_TICK_PERIOD = roundf(NANOS_IN_SEC / (float) PRODUCER_RATE); // nanoseconds
static const int CONSUMER_RATE = 60; // number of consumer reads per second
static const long CONSUMER_TICK_PERIOD = roundf(NANOS_IN_SEC / (float) CONSUMER_RATE); // nanoseconds
//---------------------------
// TripleBufferQueue
typedef struct ArgTBQ{
TripleBufferQueue tbq;
std::atomic<bool> run;
int counter[2];
std::thread *consumer;
std::thread *producer;
} ArgTBQ;
void tbqProducer(ArgTBQ* a){
if( FREE_LOOP_MODE ){
int i=0;
while(a->run){
if(a->tbq.push(&i))
a->counter[PRODUCER]++;
i++;
}
}
else {
Clock::time_point currentTime = Clock::now();
double accumulator = 0;
int i=0;
while (a->run) {
Clock::time_point newTime = Clock::now();
double frameTime = std::chrono::duration_cast<nanoseconds>(newTime - currentTime).count();
currentTime = newTime;
accumulator += frameTime;
while (accumulator >= PRODUCER_TICK_PERIOD){
if(a->tbq.push(&i))
a->counter[PRODUCER]++;
i++;
accumulator -= PRODUCER_TICK_PERIOD;
}
}
}
}
void tbqConsumer(ArgTBQ* a){
if( FREE_LOOP_MODE ){
while(a->run){
int* j;
if(a->tbq.pop((void**)&j))
a->counter[CONSUMER]++;
}
}
else {
Clock::time_point currentTime = Clock::now();
double accumulator = 0;
while (a->run) {
Clock::time_point newTime = Clock::now();
double frameTime = std::chrono::duration_cast<nanoseconds>(newTime - currentTime).count();
currentTime = newTime;
accumulator += frameTime;
while (accumulator >= CONSUMER_TICK_PERIOD){
int* j;
if(a->tbq.pop((void**)&j))
a->counter[CONSUMER]++;
accumulator -= CONSUMER_TICK_PERIOD;
}
}
}
}
void testTripleBufferQueue(ArgTBQ *args){
args->run = true;
args->counter[PRODUCER] = 0;
args->counter[CONSUMER] = 0;
args->producer = new std::thread(tbqProducer, args);
args->consumer = new std::thread(tbqConsumer, args);
}
//---------------------------
// TripleBuffer
typedef struct ArgTB{
TripleBuffer<int> tb;
std::atomic<bool> run;
volatile int counter[2];
std::thread *consumer;
std::thread *producer;
} ArgTB;
void tbProducer(ArgTB* a){
if( FREE_LOOP_MODE ){
int i=0;
while(a->run){
a->tb.update(i);
a->counter[PRODUCER]++;
i++;
}
}
else {
Clock::time_point currentTime = Clock::now();
double accumulator = 0;
int i=0;
while (a->run) {
Clock::time_point newTime = Clock::now();
double frameTime = std::chrono::duration_cast<nanoseconds>(newTime - currentTime).count();
currentTime = newTime;
accumulator += frameTime;
while (accumulator >= PRODUCER_TICK_PERIOD){
a->tb.update(i);
a->counter[PRODUCER]++;
i++;
accumulator -= PRODUCER_TICK_PERIOD;
}
}
}
}
void tbConsumer(ArgTB* a){
if( FREE_LOOP_MODE ){
while(a->run){
a->tb.readLast();
a->counter[CONSUMER]++;
}
}
else {
Clock::time_point currentTime = Clock::now();
double accumulator = 0;
while (a->run) {
Clock::time_point newTime = Clock::now();
double frameTime = std::chrono::duration_cast<nanoseconds>(newTime - currentTime).count();
currentTime = newTime;
accumulator += frameTime;
while (accumulator >= CONSUMER_TICK_PERIOD){
a->tb.readLast();
a->counter[CONSUMER]++;
accumulator -= CONSUMER_TICK_PERIOD;
}
}
}
}
void testTripleBuffer(ArgTB *args){
args->run = true;
args->counter[PRODUCER] = 0;
args->counter[CONSUMER] = 0;
args->producer = new std::thread(tbProducer, args);
args->consumer = new std::thread(tbConsumer, args);
}
//---------------------------
// Benchmark
int main(int argc, const char * argv[])
{
long sumPrd=0, sumCon=0;
// -----------------------
std::vector<ArgTBQ> argsTBQ(NUM_RUNS);
printf(" TripleBufferQueue (%d runs, %d seconds each)\n", NUM_RUNS, RUN_SECONDS);
printf(" # Produced Consumed +-Delta Total\n");
for (int i=0; i<NUM_RUNS; i++){
testTripleBufferQueue(&argsTBQ[i]);
std::this_thread::sleep_for(chrono::nanoseconds(RUN_SECONDS * NANOS_IN_SEC));
argsTBQ[i].run = false;
argsTBQ[i].producer->join();
argsTBQ[i].consumer->join();
delete argsTBQ[i].producer;
delete argsTBQ[i].consumer;
int prd = argsTBQ[i].counter[PRODUCER];
int con = argsTBQ[i].counter[CONSUMER];
sumPrd += prd;
sumCon += con;
printf("%3d %10d %10d %10d %10d\n", (i+1), prd, con, prd - con, prd + con);
}
printf("--- -------- -------- -------- --------\n");
printf("Avg %10.2f %10.2f %10.2f %10.2f\n\n", (float)sumPrd/NUM_RUNS, (float)sumCon/NUM_RUNS, (float)(sumPrd - sumCon)/NUM_RUNS,(float)(sumPrd + sumCon)/NUM_RUNS);
// ------------------------
std::vector<ArgTB> argsTB(NUM_RUNS);
printf(" TripleBuffer (%d runs, %d seconds each)\n", NUM_RUNS, RUN_SECONDS);
printf(" # Produced Consumed +-Delta Total\n");
sumPrd=0;
sumCon=0;
for (int i=0; i<NUM_RUNS; i++){
testTripleBuffer(&argsTB[i]);
std::this_thread::sleep_for(chrono::nanoseconds(RUN_SECONDS * NANOS_IN_SEC));
argsTB[i].run = false;
argsTB[i].producer->join();
argsTB[i].consumer->join();
delete argsTB[i].producer;
delete argsTB[i].consumer;
int prd = argsTB[i].counter[PRODUCER];
int con = argsTB[i].counter[CONSUMER];
sumPrd += prd;
sumCon += con;
printf("%3d %10d %10d %10d %10d\n", (i+1), prd, con, prd - con, prd + con);
}
printf("--- -------- -------- -------- --------\n");
printf("Avg %10.2f %10.2f %10.2f %10.2f\n\n", (float)sumPrd/NUM_RUNS, (float)sumCon/NUM_RUNS, (float)(sumPrd - sumCon)/NUM_RUNS,(float)(sumPrd + sumCon)/NUM_RUNS);
return 0;
}
//============================================================================
// Name : TripleBuffer.hxx
// Author : André Pacheco Neves
// Version : 1.0 (27/01/13)
// Copyright : Copyright (c) 2013, André Pacheco Neves
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
// * Neither the name of the <organization> nor the
// names of its contributors may be used to endorse or promote products
// derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> BE LIABLE FOR ANY
// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
// Description : Template class for a TripleBuffer as a concurrency mechanism, using atomic operations
// Credits : http://remis-thoughts.blogspot.pt/2012/01/triple-buffering-as-concurrency_30.html
//============================================================================
#ifndef TRIPLEBUFFER_HXX_
#define TRIPLEBUFFER_HXX_
#include <atomic>
using namespace std;
template <typename T>
class TripleBuffer
{
public:
TripleBuffer<T>();
TripleBuffer<T>(const T& init);
// non-copyable behavior
TripleBuffer<T>(const TripleBuffer<T>&) = delete;
TripleBuffer<T>& operator=(const TripleBuffer<T>&) = delete;
T snap() const; // get the current snap to read
void write(const T newT); // write a new value
bool newSnap(); // swap to the latest value, if any
void flipWriter(); // flip writer positions dirty / clean
T readLast(); // wrapper to read the last available element (newSnap + snap)
void update(T newT); // wrapper to update with a new element (write + flipWriter)
private:
bool isNewWrite(uint_fast8_t flags); // check if the newWrite bit is 1
uint_fast8_t swapSnapWithClean(uint_fast8_t flags); // swap Snap and Clean indexes
uint_fast8_t newWriteSwapCleanWithDirty(uint_fast8_t flags); // set newWrite to 1 and swap Clean and Dirty indexes
// 8 bit flags are (unused) (new write) (2x dirty) (2x clean) (2x snap)
// newWrite = (flags & 0x40)
// dirtyIndex = (flags & 0x30) >> 4
// cleanIndex = (flags & 0xC) >> 2
// snapIndex = (flags & 0x3)
mutable atomic_uint_fast8_t flags;
T buffer[3];
};
// include implementation in header since it is a template
template <typename T>
TripleBuffer<T>::TripleBuffer(){
T dummy = T();
buffer[0] = dummy;
buffer[1] = dummy;
buffer[2] = dummy;
flags.store(0x6, std::memory_order_relaxed); // initially dirty = 0, clean = 1 and snap = 2
}
template <typename T>
TripleBuffer<T>::TripleBuffer(const T& init){
buffer[0] = init;
buffer[1] = init;
buffer[2] = init;
flags.store(0x6, std::memory_order_relaxed); // initially dirty = 0, clean = 1 and snap = 2
}
template <typename T>
T TripleBuffer<T>::snap() const{
return buffer[flags.load(std::memory_order_consume) & 0x3]; // read snap index
}
template <typename T>
void TripleBuffer<T>::write(const T newT){
buffer[(flags.load(std::memory_order_consume) & 0x30) >> 4] = newT; // write into dirty index
}
template <typename T>
bool TripleBuffer<T>::newSnap(){
uint_fast8_t flagsNow;
uint_fast8_t newFlags;
do {
flagsNow = flags.load(std::memory_order_consume);
if( !isNewWrite(flagsNow) ) // nothing new, no need to swap
return false;
newFlags = swapSnapWithClean(flagsNow);
} while(!flags.compare_exchange_weak(flagsNow,
newFlags,
memory_order_release,
memory_order_consume));
return true;
}
template <typename T>
void TripleBuffer<T>::flipWriter(){
uint_fast8_t flagsNow;
uint_fast8_t newFlags;
do {
flagsNow = flags.load(std::memory_order_consume);
newFlags = newWriteSwapCleanWithDirty(flagsNow);
} while(!flags.compare_exchange_weak(flagsNow,
newFlags,
memory_order_release,
memory_order_consume));
}
template <typename T>
T TripleBuffer<T>::readLast(){
newSnap(); // get most recent value
return snap(); // return it
}
template <typename T>
void TripleBuffer<T>::update(T newT){
write(newT); // write new value
flipWriter(); // change dirty/clean buffer positions for the next update
}
template <typename T>
bool TripleBuffer<T>::isNewWrite(uint_fast8_t flags){
// check if the newWrite bit is 1
return ((flags & 0x40) != 0);
}
template <typename T>
uint_fast8_t TripleBuffer<T>::swapSnapWithClean(uint_fast8_t flags){
// swap snap with clean
return (flags & 0x30) | ((flags & 0x3) << 2) | ((flags & 0xC) >> 2);
}
template <typename T>
uint_fast8_t TripleBuffer<T>::newWriteSwapCleanWithDirty(uint_fast8_t flags){
// set newWrite bit to 1 and swap clean with dirty
return 0x40 | ((flags & 0xC) << 2) | ((flags & 0x30) >> 2) | (flags & 0x3);
}
#endif /* TRIPLEBUFFER_HXX_ */
//
// TripleBufferQueue.hxx
// TripleBufferBenchmark
//
#include <atomic>
class TripleBufferQueue
{
public:
TripleBufferQueue() : head(0), tail(0) {}
bool push(void *value)
{
bool no_drop = true;
size_t c = head.load(m1);
size_t n = next(c);
if (n == tail.load(m2))
{
c = next(n);
n = next(c);
no_drop = false;
}
head.store(n, m3);
buffer[c] = value;
return no_drop;
}
bool pop(void **value)
{
size_t t = tail.load(m1);
if (t == head.load(m2))
return false;
*value = buffer[tail];
tail.store(next(t), m3);
return true;
}
private:
const static std::memory_order
m1 = std::memory_order_relaxed,
m2 = std::memory_order_acquire,
m3 = std::memory_order_release;
const static size_t size = 3;
void * buffer[size];
std::atomic<size_t> head, tail;
size_t next(size_t current) { return (current + 1) % size; }
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment