Skip to content

Instantly share code, notes, and snippets.

@rdnt
Created April 3, 2020 09:54
Show Gist options
  • Save rdnt/25fd4a0466f2fe09023a713fad7785fe to your computer and use it in GitHub Desktop.
Save rdnt/25fd4a0466f2fe09023a713fad7785fe to your computer and use it in GitHub Desktop.
condition send receive
// Final example with condition variables
// Compile with: gcc -O2 -Wall -pthread cv-example-final2.c -o cv-example-final2
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#define MESSAGES 20
#define QUEUE_SIZE 5
#define DEBUG 0
// ---- globals ----
// global integer buffer
int global_buffer[QUEUE_SIZE];
// global avail messages count (n <= QUEUE_SIZE)
int global_availmsg = 0; // empty
// conditional variable, signals a put operation (receiver waits on this)
pthread_cond_t msg_in = PTHREAD_COND_INITIALIZER;
// conditional variable, signals a get operation (sender waits on this)
pthread_cond_t msg_out = PTHREAD_COND_INITIALIZER;
// mutex protecting common resources
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int sent = 0;
int received = 0;
// ---- send/receive functions ----
void send_msg(int msg) {
pthread_mutex_lock(&mutex);
while (global_availmsg>0) { // NOTE: we use while instead of if! more than one thread may wake up
// cf. 'mesa' vs 'hoare' semantics
pthread_cond_wait(&msg_out,&mutex); // wait until a msg is received - NOTE: mutex MUST be locked here.
// If thread is going to wait, mutex is unlocked automatically.
// When we wake up, mutex will be locked by us again.
}
// send message
global_buffer[global_availmsg] = msg;
global_availmsg++;
if (DEBUG == 1) {
printf("SEND %d\n", msg);
}
sent++;
// signal the receiver that something was put in buffer
pthread_cond_signal(&msg_in);
pthread_mutex_unlock(&mutex);
}
int recv_msg() {
// lock mutex
pthread_mutex_lock(&mutex);
while (global_availmsg<1) { // NOTE: we use while instead of if! see above in producer code
pthread_cond_wait(&msg_in,&mutex);
}
// receive message
int msg = global_buffer[global_availmsg-1];
global_availmsg--;
if (DEBUG == 1) {
printf("RECV %d\n", msg);
}
received++;
// signal the sender that something was removed from buffer
pthread_cond_signal(&msg_out);
pthread_mutex_unlock(&mutex);
return(msg);
}
// producer thread function
void *producer_thread(void *args) {
// send a predefined number of messages
for (int i=0;i<MESSAGES;i++) {
send_msg(i);
if (DEBUG == 0) {
printf("PRODUCER SENT %d\n", i);
}
}
// exit and let be joined
pthread_exit(NULL);
return 0;
}
// receiver thread function
void *consumer_thread(void *args) {
// receive a predefined number of messages
for (int i=0;i<MESSAGES;i++) {
int msg = recv_msg();
// recv_msg();
if (DEBUG == 0) {
printf("CONSUMER RECV %d\n", msg);
}
}
// exit and let be joined
pthread_exit(NULL);
return 0;
}
int main() {
pthread_t producer,consumer;
// create threads
pthread_create(&producer,NULL,producer_thread,NULL);
pthread_create(&consumer,NULL,consumer_thread,NULL);
// then join threads
pthread_join(producer,NULL);
pthread_join(consumer,NULL);
// destroy mutex - should be unlocked
pthread_mutex_destroy(&mutex);
// destroy cvs - no process should be waiting on these
pthread_cond_destroy(&msg_out);
pthread_cond_destroy(&msg_in);
return 0;
}
@mixstef
Copy link

mixstef commented Apr 3, 2020

πχ. εξαγωγή από το receive, εισαγωγή στο send
Η ουρά έχει διαφορετικά σημεία εισαγωγής και εξαγωγής

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment