Skip to content

Instantly share code, notes, and snippets.

@codeslinger
Last active August 29, 2015 14:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save codeslinger/19429aa39aa4ed677cf3 to your computer and use it in GitHub Desktop.
Save codeslinger/19429aa39aa4ed677cf3 to your computer and use it in GitHub Desktop.
Multiple consumers each consuming all of the queue items
Compile with: gcc -g -std=c99 -Wall -Werror test-vrt-mc.c -o test-vrt-mc -lvrt -lcork -lpthread
> ./test-vrt-mc
Results: 499999500000 499999500000
#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>
#include <libcork/core.h>
#include <libcork/helpers/errors.h>
#include <vrt.h>
struct vrt_integer_value {
struct vrt_value parent;
int32_t value;
};
struct integer_generator {
struct vrt_producer *p;
int64_t count;
};
struct integer_summer {
struct vrt_consumer *c;
int64_t *sum;
};
struct vrt_queue_client {
void *(*run)(void *);
void *u;
};
static struct vrt_value *vrt_integer_value_new(struct vrt_value_type *type)
{
struct vrt_integer_value *self;
self = cork_new(struct vrt_integer_value);
return &self->parent;
}
static void vrt_integer_value_free(struct vrt_value_type *type,
struct vrt_value *value)
{
struct vrt_integer_value *iself;
iself = cork_container_of(value, struct vrt_integer_value, parent);
cork_free(iself, sizeof(iself));
}
static struct vrt_value_type _vrt_integer_value_type = {
vrt_integer_value_new,
vrt_integer_value_free
};
static struct vrt_value_type *vrt_integer_value_type(void)
{
return &_vrt_integer_value_type;
}
static int vrt_queue_threaded(struct vrt_queue *q,
struct vrt_queue_client *clients)
{
size_t i,
client_count = 0;
pthread_t *tids;
struct vrt_producer *p;
struct vrt_consumer *c;
struct vrt_queue_client *client;
for (client = clients; client->run; client++) {
client_count++;
}
tids = cork_calloc(client_count, sizeof(*tids));
for (i = 0; i < cork_array_size(&q->producers); i++) {
p = (struct vrt_producer *) cork_array_at(&q->producers, i);
p->yield = vrt_yield_strategy_threaded();
}
for (i = 0; i < cork_array_size(&q->consumers); i++) {
c = (struct vrt_consumer *) cork_array_at(&q->consumers, i);
c->yield = vrt_yield_strategy_threaded();
}
for (i = 0; i < client_count; i++) {
pthread_create(&tids[i], NULL, clients[i].run, clients[i].u);
}
for (i = 0; i < client_count; i++) {
pthread_join(tids[i], NULL);
}
cork_free(tids, client_count * sizeof(*tids));
return 0;
}
void *generate_integers(void *u)
{
int32_t i;
struct vrt_value *vv;
struct vrt_integer_value *iv;
struct integer_generator *c = (struct integer_generator *) u;
for (i = 0; i < c->count; i++) {
rpi_check(vrt_producer_claim(c->p, &vv));
iv = cork_container_of(vv, struct vrt_integer_value, parent);
iv->value = i;
rpi_check(vrt_producer_publish(c->p));
}
rpi_check(vrt_producer_eof(c->p));
return NULL;
}
void *sum_integers(void *u)
{
int rv;
int64_t sum = 0;
struct vrt_value *vv;
struct integer_summer *c = (struct integer_summer *) u;
struct vrt_integer_value *iv;
while ((rv = vrt_consumer_next(c->c, &vv)) != VRT_QUEUE_EOF) {
if (rv) continue;
iv = cork_container_of(vv, struct vrt_integer_value, parent);
sum += iv->value;
}
if (rv == VRT_QUEUE_EOF) {
*c->sum = sum;
}
return NULL;
}
int main(int argc, char **argv)
{
int64_t r1,
r2;
struct vrt_queue *q;
struct vrt_producer *p;
struct vrt_consumer *c1,
*c2;
rip_check(q = vrt_queue_new("qsum", vrt_integer_value_type(), 64));
rip_check(p = vrt_producer_new("generator", 1, q));
rip_check(c1 = vrt_consumer_new("summer1", q));
rip_check(c2 = vrt_consumer_new("summer2", q));
struct integer_generator igen = {p, 1000000};
struct integer_summer isum1 = {c1, &r1};
struct integer_summer isum2 = {c2, &r2};
struct vrt_queue_client clients[] = {
{sum_integers, &isum1},
{sum_integers, &isum2},
{generate_integers, &igen},
{NULL, NULL}
};
rii_check(vrt_queue_threaded(q, clients));
fprintf(stdout, "Results: %lu %lu\n", r1, r2);
vrt_queue_free(q);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment