Skip to content

Instantly share code, notes, and snippets.

@Pangoraw
Last active December 4, 2020 09:19
Show Gist options
  • Save Pangoraw/aa95181d532c1e86a33a7fd13f776408 to your computer and use it in GitHub Desktop.
Save Pangoraw/aa95181d532c1e86a33a7fd13f776408 to your computer and use it in GitHub Desktop.
A thread safe go channels implementation using pthreads.h
#include "channel.h"
queue_t *queue_new() {
queue_t *queue = malloc(sizeof(queue_t));
queue_init(queue);
return queue;
}
void queue_init(queue_t *queue) {
queue->end = 0;
queue->start = 0;
queue->length = QUEUE_SIZE;
}
int queue_is_empty(queue_t *queue) {
return queue->end == queue->start;
}
void queue_push(queue_t *queue, int64_t value) {
queue->values[queue->end] = value;
queue->end = (queue->end + 1) % QUEUE_SIZE;
}
int64_t queue_pop(queue_t *queue) {
int64_t value = queue->values[queue->start];
queue->start = (queue->start + 1) % QUEUE_SIZE;
return value;
}
channel_t *channel_new() {
channel_t *channel = malloc(sizeof(channel_t));
channel_init(channel);
return channel;
}
void channel_init(channel_t *channel) {
pthread_cond_init(&channel->condvar, NULL);
pthread_mutex_init(&channel->mutex, NULL);
queue_init(&channel->queue);
}
void channel_free(channel_t *channel) {
if (channel == NULL)
return;
pthread_cond_destroy(&channel->condvar);
pthread_mutex_destroy(&channel->mutex);
free(channel);
}
void channel_send(channel_t *channel, int64_t value) {
if (pthread_mutex_lock(&channel->mutex) != 0) {
printf("error: locking mutex\n");
}
queue_push(&channel->queue, value);
if (pthread_mutex_unlock(&channel->mutex) != 0) {
printf("error: unlocking mutex\n");
}
pthread_cond_signal(&channel->condvar);
}
int64_t channel_receive(channel_t *channel) {
char had_to_wait = 0;
if (queue_is_empty(&channel->queue)) {
if (pthread_cond_wait(&channel->condvar, &channel->mutex) != 0) {
printf("error: while waiting for cond var\n");
}
had_to_wait = 1;
}
if (had_to_wait != 1 && pthread_mutex_trylock(&channel->mutex) != 0) {
printf("error: locking mutex\n");
}
int64_t value = queue_pop(&channel->queue);
if (pthread_mutex_unlock(&channel->mutex) != 0) {
printf("error: while releasing mutex\n");
}
return value;
}
#ifndef _CHANNEL_H_
#define _CHANNEL_H_
#include <pthread.h>
#include <stdint.h>
#include <stdlib.h>
#include <stdio.h>
#define QUEUE_SIZE 10
typedef struct queue {
size_t start;
size_t end;
uint32_t length;
int64_t values[QUEUE_SIZE];
} queue_t;
queue_t *queue_new();
void queue_init(queue_t *queue);
void queue_push(queue_t *queue, int64_t value);
int64_t queue_pop(queue_t *queue);
typedef struct channel {
pthread_cond_t condvar;
pthread_mutex_t mutex;
queue_t queue;
} channel_t;
channel_t *channel_new();
void channel_init(channel_t *channel);
void channel_send(channel_t *channel, int64_t value);
int64_t channel_receive(channel_t *channel);
#endif // _CHANNEL_H_
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include "channel.h"
#define BENCH_SIZE 100000
#define N_THREADS 4
typedef struct instruction {
size_t start, end;
channel_t *channel;
int64_t *values;
} instruction_t;
extern void *count_routine(void *arg) {
instruction_t *insts = (instruction_t *)arg;
int64_t s = 0;
for (int i = insts->start; i < insts->end; ++i) {
s += insts->values[i];
}
channel_send(insts->channel, s);
return NULL;
}
void multi_threaded() {
int64_t values[BENCH_SIZE];
for (int i = 0; i < BENCH_SIZE; ++i) {
values[i] = i + 1;
}
channel_t channel;
channel_init(&channel);
size_t offset = BENCH_SIZE / N_THREADS;
instruction_t instructions[N_THREADS];
pthread_t tid[N_THREADS];
for (int i = 0; i < N_THREADS; ++i) {
instructions[i].channel = &channel;
instructions[i].start = i * offset;
instructions[i].end = (i + 1) * offset;
instructions[i].values = values;
pthread_create(tid + i, NULL, count_routine, instructions + i);
}
int64_t s = 0;
for (int i = 0; i < N_THREADS; ++i) {
s += channel_receive(&channel);
}
for (int i = 0; i < N_THREADS; ++i) {
pthread_join(tid[i], NULL);
}
printf("sum is %ld\n", s);
}
void single_threaded() {
int64_t values[BENCH_SIZE];
for (int i = 0; i < BENCH_SIZE; ++i) {
values[i] = i + 1;
}
int64_t s = 0;
for (int i = 0; i < BENCH_SIZE; ++i) {
s += values[i];
}
printf("sum is %ld\n", s);
}
void benchmark(void (*f)()) {
clock_t start, end;
double cpu_time;
start = clock();
f();
end = clock();
cpu_time = ((double) (end - start)) / CLOCKS_PER_SEC;
printf("f took %f\n", cpu_time);
}
int main() {
benchmark(multi_threaded);
benchmark(single_threaded);
exit(EXIT_SUCCESS);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment