Skip to content

Instantly share code, notes, and snippets.

@Skyb0rg007
Last active January 23, 2019 16:47
Show Gist options
  • Save Skyb0rg007/cf937eb73ca66f4de8b4012ec06b5f40 to your computer and use it in GitHub Desktop.
Save Skyb0rg007/cf937eb73ca66f4de8b4012ec06b5f40 to your computer and use it in GitHub Desktop.
Bounded Queue with SDL Atomics
ring_buffer: 0x404180
Got value 1!
==================
WARNING: ThreadSanitizer: data race (pid=19155)
Read of size 4 at 0x000000404184 by thread T2:
#0 ring_dequeue /home/ssoss/Desktop/Amber/Amber/util/tests/minimal-ring-race.c:84 (minimal-race+0x4013c5)
#1 consumer /home/ssoss/Desktop/Amber/Amber/util/tests/minimal-ring-race.c:122 (minimal-race+0x4014c5)
#2 <null> <null> (libSDL2-2.0.so.0+0x6875f)
Previous write of size 4 at 0x000000404184 by thread T1:
#0 ring_enqueue /home/ssoss/Desktop/Amber/Amber/util/tests/minimal-ring-race.c:54 (minimal-race+0x4012fa)
#1 producer /home/ssoss/Desktop/Amber/Amber/util/tests/minimal-ring-race.c:108 (minimal-race+0x401443)
#2 <null> <null> (libSDL2-2.0.so.0+0x6875f)
Location is global 'ring_buffer' of size 4194304 at 0x000000404180 (minimal-race+0x000000404184)
Thread T2 'consumer' (tid=19158, running) created by main thread at:
#0 pthread_create <null> (libtsan.so.0+0x2cfc2)
#1 <null> <null> (libSDL2-2.0.so.0+0xe01da)
#2 __libc_start_main <null> (libc.so.6+0x24412)
Thread T1 'producer' (tid=19157, finished) created by main thread at:
#0 pthread_create <null> (libtsan.so.0+0x2cfc2)
#1 <null> <null> (libSDL2-2.0.so.0+0xe01da)
#2 __libc_start_main <null> (libc.so.6+0x24412)
SUMMARY: ThreadSanitizer: data race /home/ssoss/Desktop/Amber/Amber/util/tests/minimal-ring-race.c:84 in ring_dequeue
==================
Got value 0!
Got value 1!
Got value 2!
Got value 3!
Got value 4!
Got value 5!
Got value 6!
Got value 7!
Got value 8!
Got value 9!
Got value 10!
Got value 11!
Got value 12!
Got value 13!
Got value 14!
Got value 15!
Got value 16!
Got value 17!
Got value 18!
Got value 19!
==================
WARNING: ThreadSanitizer: data race (pid=19155)
Read of size 4 at 0x0000004041d4 by thread T2:
#0 ring_dequeue /home/ssoss/Desktop/Amber/Amber/util/tests/minimal-ring-race.c:84 (minimal-race+0x4013c5)
#1 consumer /home/ssoss/Desktop/Amber/Amber/util/tests/minimal-ring-race.c:122 (minimal-race+0x4014c5)
#2 <null> <null> (libSDL2-2.0.so.0+0x6875f)
Previous write of size 4 at 0x0000004041d4 by thread T1:
#0 ring_enqueue /home/ssoss/Desktop/Amber/Amber/util/tests/minimal-ring-race.c:54 (minimal-race+0x4012fa)
#1 producer /home/ssoss/Desktop/Amber/Amber/util/tests/minimal-ring-race.c:112 (minimal-race+0x401483)
#2 <null> <null> (libSDL2-2.0.so.0+0x6875f)
Location is global 'ring_buffer' of size 4194304 at 0x000000404180 (minimal-race+0x0000004041d4)
Thread T2 'consumer' (tid=19158, running) created by main thread at:
#0 pthread_create <null> (libtsan.so.0+0x2cfc2)
#1 <null> <null> (libSDL2-2.0.so.0+0xe01da)
#2 __libc_start_main <null> (libc.so.6+0x24412)
Thread T1 'producer' (tid=19157, finished) created by main thread at:
#0 pthread_create <null> (libtsan.so.0+0x2cfc2)
#1 <null> <null> (libSDL2-2.0.so.0+0xe01da)
#2 __libc_start_main <null> (libc.so.6+0x24412)
SUMMARY: ThreadSanitizer: data race /home/ssoss/Desktop/Amber/Amber/util/tests/minimal-ring-race.c:84 in ring_dequeue
==================
Got value -1!
ThreadSanitizer: reported 2 warnings
#include <stdatomic.h>
#include <SDL_thread.h>
#include <assert.h>
#include <inttypes.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdio.h>
#define CACHESIZE 64
struct ring {
atomic_uint c_head;
char _pad0[CACHESIZE - sizeof(atomic_uint)];
atomic_uint p_head; /* Used to sync producers */
atomic_uint p_tail;
char _pad1[CACHESIZE - sizeof(atomic_uint) * 2];
unsigned mask;
char _pad2[CACHESIZE - sizeof(unsigned)];
};
static bool stdatomic_cas(atomic_uint *a, unsigned old, unsigned new)
{
return atomic_compare_exchange_strong(a, &old, new);
}
static bool ring_enqueue(struct ring *ring, int32_t *buf, int32_t entry)
{
const unsigned mask = ring->mask;
unsigned producer;
unsigned consumer;
/* Increment p_head while ensuring it doesn't pass c_head */
do {
producer = atomic_load(&ring->p_head);
consumer = atomic_load(&ring->c_head);
/* Check if buffer is full */
if ((producer - consumer) >= mask)
return false;
} while (!stdatomic_cas(&ring->p_head, producer, producer + 1));
/* producer is now a unique entry that is behind p_head and after c_head
* producer should also be ahead of p_tail
*/
buf[producer & mask] = entry;
/* Swing p_tail to current position to allow consumption
* We stall to ensure we don't expose a slower producer thread
*/
while (!stdatomic_cas(&ring->p_tail, producer, producer + 1)) {
/* Stall */
}
return true;
}
static bool ring_dequeue(struct ring *ring, int32_t *buf, int32_t *entry)
{
const unsigned mask = ring->mask;
unsigned consumer;
unsigned producer;
do {
consumer = atomic_load(&ring->c_head);
producer = atomic_load(&ring->p_tail);
/* Ensure buffer is not empty */
if (producer == consumer)
return false;
/* Even if p_tail has been changed, c_head is still going to be valid
* because p_tail is only ever increased
*/
*entry = buf[consumer & mask];
/* consumer may not be unique, but buf[consumer] is only read
* now try to increment consumer - if it succeeds we are done
* if it fails, then a different consumer took this entry instead
*/
} while (!stdatomic_cas(&ring->c_head, consumer, consumer + 1));
return true;
}
/* Globals */
static struct ring ring = {
.p_head = ATOMIC_VAR_INIT(0),
.p_tail = ATOMIC_VAR_INIT(0),
.c_head = ATOMIC_VAR_INIT(0),
.mask = (1 << 20) - 1
};
static int32_t ring_buffer[1 << 20];
int b = 0;
/* Threads */
static int producer(void *data) {
for (int i = 0; i < 20; i++) {
bool ret = ring_enqueue(&ring, ring_buffer, (int32_t)i);
assert(ret);
}
ring_enqueue(&ring, ring_buffer, -1);
(void)data;
return 0;
}
static int consumer(void *data) {
int32_t entry;
do {
bool ret = ring_dequeue(&ring, ring_buffer, &entry);
if (ret)
fprintf(stderr, "Got value %"PRIi32"!\n", entry);
} while (entry != -1);
(void)data;
return 0;
}
int main(void)
{
fprintf(stderr, "ring_buffer: %p\n", (void *)ring_buffer);
ring_enqueue(&ring, ring_buffer, 1);
SDL_Thread *t1 = SDL_CreateThread(producer, "producer", NULL);
SDL_Thread *t2 = SDL_CreateThread(producer, "producer", NULL);
SDL_Thread *t3 = SDL_CreateThread(consumer, "consumer", NULL);
SDL_Thread *t4 = SDL_CreateThread(consumer, "consumer", NULL);
SDL_WaitThread(t1, NULL);
SDL_WaitThread(t2, NULL);
SDL_WaitThread(t3, NULL);
SDL_WaitThread(t4, NULL);
}
#include <SDL_atomic.h>
#include <SDL_thread.h>
#include <assert.h>
#include <inttypes.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdio.h>
#define CACHESIZE 64
struct ring {
SDL_atomic_t c_head;
char _pad0[CACHESIZE - sizeof(SDL_atomic_t)];
SDL_atomic_t p_head; /* Used to sync producers */
SDL_atomic_t p_tail;
char _pad1[CACHESIZE - sizeof(SDL_atomic_t) * 2];
unsigned mask;
char _pad2[CACHESIZE - sizeof(unsigned)];
};
#if 0
invariants: c_head <= p_tail <= p_head, and (p_head - c_head) < ring_size
c_head - where consumer will read from next. Set by consumers
if c_head == p_tail, then there are no readable entries.
if c_head > p_head, then consistency has been broken.
p_tail - where producers are done. Set by producers at the end
if p_tail == p_head, then no producers are currently in use
p_head - p_tail == number of producers in use
p_head - where producers insert to next. Set by producers at the beginning
#endif
static unsigned ring_size(struct ring *ring)
{
unsigned consumer = (unsigned)SDL_AtomicGet(&ring->c_head);
unsigned producer = (unsigned)SDL_AtomicGet(&ring->p_head);
unsigned mask = ring->mask;
return (producer - consumer) & mask;
}
static bool ring_enqueue(struct ring *ring, int32_t *buf, int32_t entry)
{
const unsigned mask = ring->mask;
unsigned producer;
unsigned consumer;
/* Increment p_head while ensuring it doesn't pass c_head */
do {
producer = (unsigned)SDL_AtomicGet(&ring->p_head);
consumer = (unsigned)SDL_AtomicGet(&ring->c_head);
/* Check if buffer is full */
if ((producer - consumer) >= mask)
return false;
} while (!SDL_AtomicCAS(&ring->p_head, (int)producer, (int)(producer + 1)));
/* producer is now a unique entry that is behind p_head and after c_head
* producer should also be ahead or equal to p_tail
*/
buf[producer & mask] = entry;
/* Swing p_tail to current position to allow consumption
* We stall to ensure we don't expose a slower producer thread
* This is the non-lockfree part of the algorithm - hopefully we don't spin here long
*/
while (!SDL_AtomicCAS(&ring->p_tail, (int)producer, (int)(producer + 1))) {
/* Stall */
}
return true;
}
static bool ring_dequeue(struct ring *ring, int32_t *buf, int32_t *entry)
{
const unsigned mask = ring->mask;
unsigned consumer;
unsigned producer;
do {
consumer = (unsigned)SDL_AtomicGet(&ring->c_head);
producer = (unsigned)SDL_AtomicGet(&ring->p_tail);
/* Ensure buffer has readable entries */
if (producer == consumer)
return false;
/* Even if p_tail has been changed, c_head is still going to be valid
* because p_tail is only ever increased
*/
*entry = buf[consumer & mask];
/* consumer may not be unique, but buf[consumer] is only read
* now try to increment consumer - if it succeeds we are done
* if it fails, then a different consumer took this entry instead
*/
} while (!SDL_AtomicCAS(&ring->c_head, (int)consumer, (int)(consumer + 1)));
return true;
}
/* Globals */
static struct ring ring = {
.p_head = {0},
.p_tail = {0},
.c_head = {0},
.mask = (1 << 20) - 1
};
static int32_t ring_buffer[1 << 20];
/* Threads */
static int producer(void *data) {
for (int i = 0; i < 20; i++) {
bool ret = ring_enqueue(&ring, ring_buffer, (int32_t)i);
assert(ret);
}
ring_enqueue(&ring, ring_buffer, -1);
(void)data;
return 0;
}
static int consumer(void *data) {
int32_t entry;
do {
bool ret = ring_dequeue(&ring, ring_buffer, &entry);
if (ret)
fprintf(stderr, "Got value %"PRIi32"!\n", entry);
} while (entry != -1);
(void)data;
return 0;
}
int main(void)
{
fprintf(stderr, "ring_buffer: %p\n", (void *)ring_buffer);
ring_enqueue(&ring, ring_buffer, 1);
SDL_Thread *t1 = SDL_CreateThread(producer, "producer", NULL);
SDL_Thread *t2 = SDL_CreateThread(consumer, "consumer", NULL);
SDL_WaitThread(t1, NULL);
SDL_WaitThread(t2, NULL);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment