Last active
January 23, 2019 16:47
-
-
Save Skyb0rg007/cf937eb73ca66f4de8b4012ec06b5f40 to your computer and use it in GitHub Desktop.
Bounded Queue with SDL Atomics
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ring_buffer: 0x404180 | |
Got value 1! | |
================== | |
WARNING: ThreadSanitizer: data race (pid=19155) | |
Read of size 4 at 0x000000404184 by thread T2: | |
#0 ring_dequeue /home/ssoss/Desktop/Amber/Amber/util/tests/minimal-ring-race.c:84 (minimal-race+0x4013c5) | |
#1 consumer /home/ssoss/Desktop/Amber/Amber/util/tests/minimal-ring-race.c:122 (minimal-race+0x4014c5) | |
#2 <null> <null> (libSDL2-2.0.so.0+0x6875f) | |
Previous write of size 4 at 0x000000404184 by thread T1: | |
#0 ring_enqueue /home/ssoss/Desktop/Amber/Amber/util/tests/minimal-ring-race.c:54 (minimal-race+0x4012fa) | |
#1 producer /home/ssoss/Desktop/Amber/Amber/util/tests/minimal-ring-race.c:108 (minimal-race+0x401443) | |
#2 <null> <null> (libSDL2-2.0.so.0+0x6875f) | |
Location is global 'ring_buffer' of size 4194304 at 0x000000404180 (minimal-race+0x000000404184) | |
Thread T2 'consumer' (tid=19158, running) created by main thread at: | |
#0 pthread_create <null> (libtsan.so.0+0x2cfc2) | |
#1 <null> <null> (libSDL2-2.0.so.0+0xe01da) | |
#2 __libc_start_main <null> (libc.so.6+0x24412) | |
Thread T1 'producer' (tid=19157, finished) created by main thread at: | |
#0 pthread_create <null> (libtsan.so.0+0x2cfc2) | |
#1 <null> <null> (libSDL2-2.0.so.0+0xe01da) | |
#2 __libc_start_main <null> (libc.so.6+0x24412) | |
SUMMARY: ThreadSanitizer: data race /home/ssoss/Desktop/Amber/Amber/util/tests/minimal-ring-race.c:84 in ring_dequeue | |
================== | |
Got value 0! | |
Got value 1! | |
Got value 2! | |
Got value 3! | |
Got value 4! | |
Got value 5! | |
Got value 6! | |
Got value 7! | |
Got value 8! | |
Got value 9! | |
Got value 10! | |
Got value 11! | |
Got value 12! | |
Got value 13! | |
Got value 14! | |
Got value 15! | |
Got value 16! | |
Got value 17! | |
Got value 18! | |
Got value 19! | |
================== | |
WARNING: ThreadSanitizer: data race (pid=19155) | |
Read of size 4 at 0x0000004041d4 by thread T2: | |
#0 ring_dequeue /home/ssoss/Desktop/Amber/Amber/util/tests/minimal-ring-race.c:84 (minimal-race+0x4013c5) | |
#1 consumer /home/ssoss/Desktop/Amber/Amber/util/tests/minimal-ring-race.c:122 (minimal-race+0x4014c5) | |
#2 <null> <null> (libSDL2-2.0.so.0+0x6875f) | |
Previous write of size 4 at 0x0000004041d4 by thread T1: | |
#0 ring_enqueue /home/ssoss/Desktop/Amber/Amber/util/tests/minimal-ring-race.c:54 (minimal-race+0x4012fa) | |
#1 producer /home/ssoss/Desktop/Amber/Amber/util/tests/minimal-ring-race.c:112 (minimal-race+0x401483) | |
#2 <null> <null> (libSDL2-2.0.so.0+0x6875f) | |
Location is global 'ring_buffer' of size 4194304 at 0x000000404180 (minimal-race+0x0000004041d4) | |
Thread T2 'consumer' (tid=19158, running) created by main thread at: | |
#0 pthread_create <null> (libtsan.so.0+0x2cfc2) | |
#1 <null> <null> (libSDL2-2.0.so.0+0xe01da) | |
#2 __libc_start_main <null> (libc.so.6+0x24412) | |
Thread T1 'producer' (tid=19157, finished) created by main thread at: | |
#0 pthread_create <null> (libtsan.so.0+0x2cfc2) | |
#1 <null> <null> (libSDL2-2.0.so.0+0xe01da) | |
#2 __libc_start_main <null> (libc.so.6+0x24412) | |
SUMMARY: ThreadSanitizer: data race /home/ssoss/Desktop/Amber/Amber/util/tests/minimal-ring-race.c:84 in ring_dequeue | |
================== | |
Got value -1! | |
ThreadSanitizer: reported 2 warnings |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdatomic.h> | |
#include <SDL_thread.h> | |
#include <assert.h> | |
#include <inttypes.h> | |
#include <stdbool.h> | |
#include <stdint.h> | |
#include <stdio.h> | |
#define CACHESIZE 64 | |
struct ring { | |
atomic_uint c_head; | |
char _pad0[CACHESIZE - sizeof(atomic_uint)]; | |
atomic_uint p_head; /* Used to sync producers */ | |
atomic_uint p_tail; | |
char _pad1[CACHESIZE - sizeof(atomic_uint) * 2]; | |
unsigned mask; | |
char _pad2[CACHESIZE - sizeof(unsigned)]; | |
}; | |
static bool stdatomic_cas(atomic_uint *a, unsigned old, unsigned new) | |
{ | |
return atomic_compare_exchange_strong(a, &old, new); | |
} | |
static bool ring_enqueue(struct ring *ring, int32_t *buf, int32_t entry) | |
{ | |
const unsigned mask = ring->mask; | |
unsigned producer; | |
unsigned consumer; | |
/* Increment p_head while ensuring it doesn't pass c_head */ | |
do { | |
producer = atomic_load(&ring->p_head); | |
consumer = atomic_load(&ring->c_head); | |
/* Check if buffer is full */ | |
if ((producer - consumer) >= mask) | |
return false; | |
} while (!stdatomic_cas(&ring->p_head, producer, producer + 1)); | |
/* producer is now a unique entry that is behind p_head and after c_head | |
* producer should also be ahead of p_tail | |
*/ | |
buf[producer & mask] = entry; | |
/* Swing p_tail to current position to allow consumption | |
* We stall to ensure we don't expose a slower producer thread | |
*/ | |
while (!stdatomic_cas(&ring->p_tail, producer, producer + 1)) { | |
/* Stall */ | |
} | |
return true; | |
} | |
static bool ring_dequeue(struct ring *ring, int32_t *buf, int32_t *entry) | |
{ | |
const unsigned mask = ring->mask; | |
unsigned consumer; | |
unsigned producer; | |
do { | |
consumer = atomic_load(&ring->c_head); | |
producer = atomic_load(&ring->p_tail); | |
/* Ensure buffer is not empty */ | |
if (producer == consumer) | |
return false; | |
/* Even if p_tail has been changed, c_head is still going to be valid | |
* because p_tail is only ever increased | |
*/ | |
*entry = buf[consumer & mask]; | |
/* consumer may not be unique, but buf[consumer] is only read | |
* now try to increment consumer - if it succeeds we are done | |
* if it fails, then a different consumer took this entry instead | |
*/ | |
} while (!stdatomic_cas(&ring->c_head, consumer, consumer + 1)); | |
return true; | |
} | |
/* Globals */ | |
static struct ring ring = { | |
.p_head = ATOMIC_VAR_INIT(0), | |
.p_tail = ATOMIC_VAR_INIT(0), | |
.c_head = ATOMIC_VAR_INIT(0), | |
.mask = (1 << 20) - 1 | |
}; | |
static int32_t ring_buffer[1 << 20]; | |
int b = 0; | |
/* Threads */ | |
static int producer(void *data) { | |
for (int i = 0; i < 20; i++) { | |
bool ret = ring_enqueue(&ring, ring_buffer, (int32_t)i); | |
assert(ret); | |
} | |
ring_enqueue(&ring, ring_buffer, -1); | |
(void)data; | |
return 0; | |
} | |
static int consumer(void *data) { | |
int32_t entry; | |
do { | |
bool ret = ring_dequeue(&ring, ring_buffer, &entry); | |
if (ret) | |
fprintf(stderr, "Got value %"PRIi32"!\n", entry); | |
} while (entry != -1); | |
(void)data; | |
return 0; | |
} | |
int main(void) | |
{ | |
fprintf(stderr, "ring_buffer: %p\n", (void *)ring_buffer); | |
ring_enqueue(&ring, ring_buffer, 1); | |
SDL_Thread *t1 = SDL_CreateThread(producer, "producer", NULL); | |
SDL_Thread *t2 = SDL_CreateThread(producer, "producer", NULL); | |
SDL_Thread *t3 = SDL_CreateThread(consumer, "consumer", NULL); | |
SDL_Thread *t4 = SDL_CreateThread(consumer, "consumer", NULL); | |
SDL_WaitThread(t1, NULL); | |
SDL_WaitThread(t2, NULL); | |
SDL_WaitThread(t3, NULL); | |
SDL_WaitThread(t4, NULL); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <SDL_atomic.h> | |
#include <SDL_thread.h> | |
#include <assert.h> | |
#include <inttypes.h> | |
#include <stdbool.h> | |
#include <stdint.h> | |
#include <stdio.h> | |
#define CACHESIZE 64 | |
struct ring { | |
SDL_atomic_t c_head; | |
char _pad0[CACHESIZE - sizeof(SDL_atomic_t)]; | |
SDL_atomic_t p_head; /* Used to sync producers */ | |
SDL_atomic_t p_tail; | |
char _pad1[CACHESIZE - sizeof(SDL_atomic_t) * 2]; | |
unsigned mask; | |
char _pad2[CACHESIZE - sizeof(unsigned)]; | |
}; | |
#if 0 | |
invariants: c_head <= p_tail <= p_head, and (p_head - c_head) < ring_size | |
c_head - where consumer will read from next. Set by consumers | |
if c_head == p_tail, then there are no readable entries. | |
if c_head > p_head, then consistency has been broken. | |
p_tail - where producers are done. Set by producers at the end | |
if p_tail == p_head, then no producers are currently in use | |
p_head - p_tail == number of producers in use | |
p_head - where producers insert to next. Set by producers at the beginning | |
#endif | |
static unsigned ring_size(struct ring *ring) | |
{ | |
unsigned consumer = (unsigned)SDL_AtomicGet(&ring->c_head); | |
unsigned producer = (unsigned)SDL_AtomicGet(&ring->p_head); | |
unsigned mask = ring->mask; | |
return (producer - consumer) & mask; | |
} | |
static bool ring_enqueue(struct ring *ring, int32_t *buf, int32_t entry) | |
{ | |
const unsigned mask = ring->mask; | |
unsigned producer; | |
unsigned consumer; | |
/* Increment p_head while ensuring it doesn't pass c_head */ | |
do { | |
producer = (unsigned)SDL_AtomicGet(&ring->p_head); | |
consumer = (unsigned)SDL_AtomicGet(&ring->c_head); | |
/* Check if buffer is full */ | |
if ((producer - consumer) >= mask) | |
return false; | |
} while (!SDL_AtomicCAS(&ring->p_head, (int)producer, (int)(producer + 1))); | |
/* producer is now a unique entry that is behind p_head and after c_head | |
* producer should also be ahead or equal to p_tail | |
*/ | |
buf[producer & mask] = entry; | |
/* Swing p_tail to current position to allow consumption | |
* We stall to ensure we don't expose a slower producer thread | |
* This is the non-lockfree part of the algorithm - hopefully we don't spin here long | |
*/ | |
while (!SDL_AtomicCAS(&ring->p_tail, (int)producer, (int)(producer + 1))) { | |
/* Stall */ | |
} | |
return true; | |
} | |
static bool ring_dequeue(struct ring *ring, int32_t *buf, int32_t *entry) | |
{ | |
const unsigned mask = ring->mask; | |
unsigned consumer; | |
unsigned producer; | |
do { | |
consumer = (unsigned)SDL_AtomicGet(&ring->c_head); | |
producer = (unsigned)SDL_AtomicGet(&ring->p_tail); | |
/* Ensure buffer has readable entries */ | |
if (producer == consumer) | |
return false; | |
/* Even if p_tail has been changed, c_head is still going to be valid | |
* because p_tail is only ever increased | |
*/ | |
*entry = buf[consumer & mask]; | |
/* consumer may not be unique, but buf[consumer] is only read | |
* now try to increment consumer - if it succeeds we are done | |
* if it fails, then a different consumer took this entry instead | |
*/ | |
} while (!SDL_AtomicCAS(&ring->c_head, (int)consumer, (int)(consumer + 1))); | |
return true; | |
} | |
/* Globals */ | |
static struct ring ring = { | |
.p_head = {0}, | |
.p_tail = {0}, | |
.c_head = {0}, | |
.mask = (1 << 20) - 1 | |
}; | |
static int32_t ring_buffer[1 << 20]; | |
/* Threads */ | |
static int producer(void *data) { | |
for (int i = 0; i < 20; i++) { | |
bool ret = ring_enqueue(&ring, ring_buffer, (int32_t)i); | |
assert(ret); | |
} | |
ring_enqueue(&ring, ring_buffer, -1); | |
(void)data; | |
return 0; | |
} | |
static int consumer(void *data) { | |
int32_t entry; | |
do { | |
bool ret = ring_dequeue(&ring, ring_buffer, &entry); | |
if (ret) | |
fprintf(stderr, "Got value %"PRIi32"!\n", entry); | |
} while (entry != -1); | |
(void)data; | |
return 0; | |
} | |
int main(void) | |
{ | |
fprintf(stderr, "ring_buffer: %p\n", (void *)ring_buffer); | |
ring_enqueue(&ring, ring_buffer, 1); | |
SDL_Thread *t1 = SDL_CreateThread(producer, "producer", NULL); | |
SDL_Thread *t2 = SDL_CreateThread(consumer, "consumer", NULL); | |
SDL_WaitThread(t1, NULL); | |
SDL_WaitThread(t2, NULL); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment