Skip to content

Instantly share code, notes, and snippets.

@cpq
Created February 14, 2023 16:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cpq/41bbc4b50122be93709f203607e3f8d1 to your computer and use it in GitHub Desktop.
Save cpq/41bbc4b50122be93709f203607e3f8d1 to your computer and use it in GitHub Desktop.
// Single producer, single consumer non-blocking queue
//
// Producer:
// void *buf;
// while (mg_queue_space(q, &buf, len) == 0) WAIT(); // Wait for free space
// memcpy(buf, data, len); // Copy data to the queue
// mg_queue_add(q, len); // Advance q->head
//
// Consumer:
// void *buf;
// while ((len = mg_queue_next(q, &buf)) == 0) WAIT();
// mg_hexdump(buf, len); // Handle message
// mg_queue_del(q); // Delete message (advance tail)
//
struct mg_queue {
char *buf;
size_t len;
volatile int tail;
volatile int head;
};
void mg_queue_init(struct mg_queue *, char *, size_t); // Init queue
void mg_queue_add(struct mg_queue *, size_t len); // Advance head
void mg_queue_del(struct mg_queue *); // Advance tail
size_t mg_queue_next(struct mg_queue *, char **); // Get next message size
size_t mg_queue_space(struct mg_queue *, char **, size_t); // Get free space
// Data starts from the begining of the buffer, and offsets to the individual
// mesages are stored in the "offsets table" at the end of the buffer.
// The "offsets table" behaves like a stack: it grows down. Every stored offset
// is 32-bit, i.e. sizeof(uint32_t).
//
// When a new message is added, new offset is written and q->head is incremented
// The end of the offsets table is marked by zero offset.
//
// If data buffer is filled, q->head cannot increase anymore but there is
// free space at the beginning of the buffer, q->head can wrap to the beginning
// In order to wrap, q->tail should be >= 2: we must preserve a previous tail
// offset to know the current message length, and we must keep space for the
// zero marker, which ends the offsets table.
// So when a tail is larger than a head, then q->tail >= q->head + 2.
//
// |------------- data --------------------->|<- offsets table--|
// |
// |----free----|message1|message2|--free----| 0 |off3|off2|off1|
// ^ ^ ^ ^ ^ ^ ^
// buf off1 off2 off3 head tail len
// Return a pointer to the offsets table at a given index
#define MG_QPOS(q, i) (&((uint32_t *) &q->buf[q->len])[-1 - i])
void mg_queue_init(struct mg_queue *q, char *buf, size_t len) {
q->len = len - (len % sizeof(uint32_t)); // Align
q->buf = buf;
q->head = q->tail = 0;
if (q->len > 0) MG_QPOS(q, 0)[0] = 0; // Mark empty: *head = 0
}
size_t mg_queue_space(struct mg_queue *q, char **buf, size_t min) {
uint32_t *h = MG_QPOS(q, q->head), *t = MG_QPOS(q, q->tail), space = 0;
uint32_t hprev = q->head == 0 ? 0 : h[1]; // Previous offset
if (q->len == 0 || q->buf == NULL) { // If queue is not inited, do nothing
} else if (h <= t) { // Tail is behind, or equal to head
uint32_t ts = sizeof(uint32_t) * ((uint32_t) q->head + 2); // Table size
if (hprev + min + ts <= q->len) { // Enough space ahead ?
space = (uint32_t) q->len - ts - hprev; // Yeah
} else if (q->tail > 2) { // Nope. Have space to wrap around?
h = MG_QPOS(q, 0), *h = 0, space = t[1];
// TODO(cpq): memory barrier here? q->head must go AFTER *h = 0
q->head = 0; // Wrap!
}
} else if (&t[2] < h) { // Head is behind. Available space is from
space = t[1] - hprev; // the currect head till where tail begins
}
if (buf != NULL) *buf = q->buf + (q->head == 0 ? 0 : h[1]);
// printf("-->spc: %3d %3d %u %lu\n", q->tail, q->head, space, min);
return space;
}
size_t mg_queue_next(struct mg_queue *q, char **buf) {
uint32_t len = 0, prev, *t = MG_QPOS(q, q->tail);
if (q->len == 0 || q->buf == NULL) { // If queue is not inited, do nothing
} else {
if (q->tail > q->head && *t == 0) { // Are we ahead of head and read all?
q->tail = 0, t = MG_QPOS(q, 0); // Yes. Wrap around
}
prev = q->tail == 0 ? 0 : t[1];
if (*t > 0) len = *t - prev;
if (buf != NULL) *buf = q->buf + prev;
}
// printf("-->nxt: %3d %3d %lu\n", q->tail, q->head, len);
return len;
}
void mg_queue_add(struct mg_queue *q, size_t len) {
if (len > 0) {
uint32_t *h = MG_QPOS(q, q->head), prev = q->head == 0 ? 0 : h[1];
h[0] = prev + (uint32_t) len; // Store next offset
h[-1] = 0; // Mark the end
q->head++; // Advance head
}
// printf("-->add: %3d %3d %lu %lu\n", q->tail, q->head, len,
}
void mg_queue_del(struct mg_queue *q) {
uint32_t *t = MG_QPOS(q, q->tail); // Current tail pointer
if (*t > 0) q->tail++; // If there is message, proceed to next
// printf("-->del: %3d %3d\n", q->tail, q->head);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment