Skip to content

Instantly share code, notes, and snippets.

@cloudwu
Created February 5, 2024 04:49
Show Gist options
  • Star 12 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save cloudwu/e8cc734a31dd01b439d8d131acc361c3 to your computer and use it in GitHub Desktop.
Save cloudwu/e8cc734a31dd01b439d8d131acc361c3 to your computer and use it in GitHub Desktop.
concurrence log
#include "clog.h"
#include <stdatomic.h>
#include <stdlib.h>
#include <string.h>
#define LOGMETA_BUFFER 4096
#define LOGDATA_BUFFER (64 * 1024)
// [..........................]
// ^
// |
// ptr % LOGDATA_BUFFER
// N producer:
// add ptr, size
// copy : input -> buffer + (ptr - size) % LOGDATA_BUFFER
// consumer:
// copy : buffer + offset % LOGDATA_BUFFER -> output
// check valid ( offset + LOGDATA_BUFFER >= ptr )
struct log_data_ringbuffer {
atomic_size_t ptr;
char buffer[LOGDATA_BUFFER];
};
// N producer:
// push data into log_data_ringbuffer
// index <- atomic_fetch_add tail, 1
// offset[index] = data offset
// size[index] = size (last)
// 1 consumer:
// if head == tail then return empty
// if size[head] < 0 then return empty
// index = head ++
// offset[index] -> output
// size[index] = -1 (invalid size)
struct log_meta_ringbuffer {
atomic_size_t tail;
size_t head;
size_t offset[LOGMETA_BUFFER];
atomic_int size[LOGMETA_BUFFER];
};
struct log_buffer {
struct log_meta_ringbuffer meta;
struct log_data_ringbuffer data;
};
struct log_buffer *
log_new() {
struct log_buffer * obj = (struct log_buffer *)malloc(sizeof(*obj));
obj->meta.head = 0;
atomic_init(&obj->meta.tail, 0);
atomic_init(&obj->data.ptr, 0);
return obj;
}
void
log_delete(struct log_buffer *obj) {
free(obj);
}
static size_t
push_data(struct log_data_ringbuffer *buffer, int n, const char *data) {
size_t ptr = atomic_fetch_add(&buffer->ptr, n);
int offset = ptr % LOGDATA_BUFFER;
if (offset + n <= LOGDATA_BUFFER) {
memcpy(buffer->buffer + offset, data, n);
} else {
int second_part = offset + n - LOGDATA_BUFFER;
int first_part = n - second_part;
memcpy(buffer->buffer + offset, data, first_part);
memcpy(buffer->buffer, data+first_part, second_part);
}
return ptr;
}
void
log_push(struct log_buffer *self, int n, const char *data) {
size_t offset = push_data(&self->data, n, data);
struct log_meta_ringbuffer *meta_buffer = &self->meta;
size_t index = atomic_fetch_add(&meta_buffer->tail, 1);
index %= LOGMETA_BUFFER;
meta_buffer->offset[index] = offset;
atomic_thread_fence(memory_order_release);
atomic_store_explicit(&meta_buffer->size[index] , n, memory_order_relaxed);
}
// >0 : size , 0 empty ; -1 : drop
static int
get_index(struct log_buffer *self, size_t *offset) {
struct log_meta_ringbuffer *meta = &self->meta;
size_t t = atomic_load_explicit(&meta->tail, memory_order_relaxed);
if (t == meta->head)
return 0;
if (meta->head + LOGMETA_BUFFER < t) {
++meta->head;
return -1;
}
int index = meta->head % LOGMETA_BUFFER;
int size = atomic_load_explicit(&meta->size[index], memory_order_relaxed);
if (size < 0)
return 0;
atomic_thread_fence(memory_order_acquire);
meta->head ++;
*offset = meta->offset[index];
atomic_thread_fence(memory_order_release);
atomic_store_explicit(&meta->size[index], -1, memory_order_relaxed);
return size;
}
// >0 : length ; 0 empty ; -1 : drop
int
log_pop(struct log_buffer *self, int n, char *output) {
size_t queue_offset;
int size = get_index(self, &queue_offset);
if (size <= 0)
return size;
struct log_data_ringbuffer *buffer = &self->data;
if (size > n)
size = n;
int offset = queue_offset % LOGDATA_BUFFER;
if (offset + size <= LOGDATA_BUFFER) {
memcpy(output, buffer->buffer + offset, size);
} else {
int second_part = offset + size - LOGDATA_BUFFER;
int first_part = size - second_part;
memcpy(output, buffer->buffer + offset, first_part);
memcpy(output + first_part, buffer->buffer, second_part);
}
size_t ptr = atomic_load(&buffer->ptr);
if (queue_offset + LOGDATA_BUFFER >= ptr)
return size;
return -1;
}
#ifndef concurrence_log_h
#define concurrence_log_h
#include <stddef.h>
struct log_buffer;
struct log_buffer * log_new();
void log_delete(struct log_buffer *);
void log_push(struct log_buffer *, int n, const char *data);
// >0 : length ; 0 empty ; -1 : drop
int log_pop(struct log_buffer *self, int n, char *output);
#endif
#include "clog.h"
#include <stdio.h>
int
main() {
struct log_buffer *log = log_new();
char tmp[100];
int i;
for (i=0;i< 5000; i++) {
int n = sprintf(tmp, "Hello World %d", i);
log_push(log, n, tmp);
}
printf("push\n");
for (i=0;i< 5000; i++) {
int n = log_pop(log, 100, tmp);
if (n <= 0) {
printf("Drop %d\n", i);
} else {
printf("[%d] %.*s\n", i, n, tmp);
}
}
log_delete(log);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment