Skip to content

Instantly share code, notes, and snippets.

@mamigot
Last active December 25, 2020 06:37
Show Gist options
  • Save mamigot/9f7aaf6be598c63d1de53a0da8cacfb5 to your computer and use it in GitHub Desktop.
Save mamigot/9f7aaf6be598c63d1de53a0da8cacfb5 to your computer and use it in GitHub Desktop.

Barrier

Barrier.h

/*
  Barrier.h
 */

#include <mutex>
#include <condition_variable>

class Barrier {
public:
    Barrier(int n);
    bool arrive();
private:
    int size;
    int count = 0;
    std::mutex mut;
    std::condition_variable cond;
};

Barrier.cpp

/*
  Barrier.cpp
 */

#include "Barrier.h"
using namespace std;

Barrier::Barrier(int n) : size(n) {}
bool Barrier::arrive() {
    std::unique_lock<std::mutex> lk(mut);
    ++count;
    while (count < size) cond.wait(lk);
    cond.notify_all();
    return true;
}

Tests

#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <unistd.h>
#include <condition_variable>
#include "Barrier.h"
using namespace std;

mutex IOmutex;

int main() {
    int n = 10;
    Barrier bar(n);
    vector<thread> threads;
    while (n--) {
        threads.push_back(thread([&bar, n] {
                    IOmutex.lock();
                    cerr << "Thread " << n << " arriving\n";
                    IOmutex.unlock();
                    bar.arrive();
                    IOmutex.lock();
                    cerr << "Thread " << n << " leaving\n";
                    IOmutex.unlock();
                })
            );
    }
    IOmutex.lock();
    cerr << "Threads created\n";
    IOmutex.unlock();
    for (thread& t : threads) t.join();
    IOmutex.lock();
    cerr << "Threads done\n";
    IOmutex.unlock();
}

Bounded buffer

BB.h

#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>

class BoundedBuffer {
    friend std::ostream& operator<<(std::ostream&, const BoundedBuffer&);
public:
    BoundedBuffer(int, bool = false);
    void add(int);
    int remove();
    void display() const;
private:
    int cap = 0, size = 0;
    int front = 0; // index of next item to remove
    int back = 0;  // index for next item to add
    int* data = nullptr;
    mutable std::mutex mut;
    std::condition_variable isFull, isEmpty;
    bool trace = true;
};

BB.cpp

#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>
#include "BB.h"
using namespace std;

BoundedBuffer::BoundedBuffer(int n, bool trace)
    : cap(n), data(new int[n]), trace(trace)
{
}
void BoundedBuffer::add(int n) {
    unique_lock<mutex> ul(mut); // locks mut
    while (size == cap) isFull.wait(ul);
    data[back++] = n;
    if (back >= cap) back = 0;
    ++size;
    isEmpty.notify_all();
    if (trace) cout << "Added: " << n << ' ' << *this << endl;
}

int BoundedBuffer::remove() {
    unique_lock<mutex> ul(mut); // locks mut
    while (size == 0) isEmpty.wait(ul);
    int result = data[front++];
    if (front >= cap) front = 0;
    --size;
    isFull.notify_all();
    if (trace) cout << "Removed: " << result << ' ' << *this << endl;
    return result;
}

ostream& operator<<(ostream& os, const BoundedBuffer& bb) {
    os << "Capacity: " << bb.cap
         << ", Size: " << bb.size
         << ", Front: " << bb.front
         << ", Back: " << bb.back << endl;
    for (int i = bb.front, count = 0;
         count < bb.size;
         ++count, i = (i+1)%bb.cap)
        {
            os << bb.data[i] << ' ';
        }
    return os;
}

Tests

#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>
#include "BB.h"
using namespace std;

int main() {
    BoundedBuffer bb(9, true);
    cout << bb << endl;
    thread t1([&bb] {
            for (int i = 0; i < 20; ++i) {
                bb.add(i);
            }
        });
    thread t2([&bb] {
            for (int i = 0; i < 20; ++i) {
                bb.remove();
            }
        });

    t1.join();
    t2.join();
}

Bow / bow back

/* 
	Bow / bow back

        two events
        Multi-threaded
        grab them both in bow
        using unique_lock with lock
*/

#include <iostream>
#include <string>
#include <thread>
#include <mutex>
using namespace std;

class Friend {
public:
    Friend(const string& name) : name(name) {}

    void bow(Friend& bowee) {
        unique_lock<mutex> lk1(mut, defer_lock);
        unique_lock<mutex> lk2(bowee.mut, defer_lock);
        std::lock(lk1, lk2);
        cerr << "I bow to " << bowee.name << endl;
        bowee.bowBack(*this);
        //        bowee.mut.unlock();
        //        mut.unlock();
    }
    void bowBack(Friend& bower) {
        cerr << "I bow back to " << bower.name << endl;
    }
private:
    string name;
    mutable mutex mut;
};

int main() {
    Friend alphonse("Alphonse");
    Friend gaston("Gaston");

    thread t1([&] { alphonse.bow(gaston); });
    thread t2([&] { gaston.bow(alphonse); });

    t1.join();
    t2.join();
}

Returning data

Promise

/*
  Thread "returning" a value through a promise
 */

#include <iostream>
#include <thread>
#include <future>
using namespace std;

void func(promise<int>& prom) {
    this_thread::sleep_for(chrono::seconds(5));
    prom.set_value(17);
    // The second sleep is to show that main can get the value before
    // this thread is finished.
    this_thread::sleep_for(chrono::seconds(5));
}

int main() {
    promise<int> pro;
    //    thread t1(func, ref(pro)); // Note the ref.  Promise is not copyable.
    thread t1([&pro] { func(pro); });
    future<int> ftr = pro.get_future();
    cerr << "main waiting\n";
    // get() blocks till the promise value is set.
    cout << ftr.get() << endl;
    // A futures value can only be gotten once.
    //    cout << ftr.get() << endl;
    t1.join();
    cerr << "main done.\n";
}

Async-future

#include <future>
#include <iostream>
using namespace std;

int find_the_answer_to_ltuae() { return 42; }
void do_other_stuff() {}

int main() {
    future<int> the_answer = async(find_the_answer_to_ltuae); // tl;dnr
    //    auto the_answer = async(find_the_answer_to_ltuae);
    do_other_stuff();
    // future<T>::get synchronizes with the source of the future value.
    cout << "The answer is " << the_answer.get() << endl;
    // Note that we do not have to join any thread that async may have started.
}

Producer-consumer (condition variables)

/* One thread is pushing to the queue while another is reading from it. We can use
unique_locks only and have the consumer go to sleep while he's waiting, but for
how long? We'd like the producer to notify it whenever it makes a change to the queue.*/

std::deque<int> q;
std::mutex mu;
std::condition_variable cond;

void producer() {
	int count = 10;
	while (count > 0) {
		std::unique_lock<mutex> locker(mu);
		q.push_front(count);
		locker.unlock()
		cond.notify_one(); // notify one waiting thread, if there is one
		std::this_thread::sleep_for(chrono::seconds(1));
		count--;
	}
}

void consumer() {
	int data = 0;
	while (data != 1) {
		std::unique_lock<mutex> locker(mu);
		
		// we could just have cond.wait(locker), but this would not handle spurious wakes
		// the second argument, the lambda, tells it to go back to sleep until the queue
		// stops being empty
		// we pass the locker because cond.wait() actually releases it while it's sleeping
		// ('twould be quite inefficient to go to sleep while holding the lock)
		cond.wait(locker, [](){ return !q.empty();});
		
		data = q.back();
		q.pop_back();
		locker.unlock();
		cout << "t2 got a value from t1: " << data << endl;
	}
}

int main() {
	std::thread t1(producer);
	std::thread t2(consumer);
	t1.join();
	t2.join();
	return 0;
}

Sockets

Client

  • Create a socket with the socket() system call.

  • Connect the socket to the address of the server using the connect() system call.

  • Send and receive data. There are a number of ways to do this, but the simplest way is to use the read() and write() system calls.

#include <stdio.h>
#include <stdlib.h>

#include <netdb.h>
#include <netinet/in.h>

#include <string.h>

int main(int argc, char *argv[]) {
   int sockfd, portno, n;
   struct sockaddr_in serv_addr;
   struct hostent *server;
   
   char buffer[256];
   
   if (argc < 3) {
      fprintf(stderr,"usage %s hostname port\n", argv[0]);
      exit(0);
   }
	
   portno = atoi(argv[2]);
   
   /* Create a socket point */
   sockfd = socket(AF_INET, SOCK_STREAM, 0);
   
   if (sockfd < 0) {
      perror("ERROR opening socket");
      exit(1);
   }
	
   server = gethostbyname(argv[1]);
   
   if (server == NULL) {
      fprintf(stderr,"ERROR, no such host\n");
      exit(0);
   }
   
   bzero((char *) &serv_addr, sizeof(serv_addr));
   serv_addr.sin_family = AF_INET;
   bcopy((char *)server->h_addr, (char *)&serv_addr.sin_addr.s_addr, server->h_length);
   serv_addr.sin_port = htons(portno);
   
   /* Now connect to the server */
   if (connect(sockfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) < 0) {
      perror("ERROR connecting");
      exit(1);
   }
   
   /* Now ask for a message from the user, this message
      * will be read by server
   */
	
   printf("Please enter the message: ");
   bzero(buffer,256);
   fgets(buffer,255,stdin);
   
   /* Send message to the server */
   n = write(sockfd, buffer, strlen(buffer));
   
   if (n < 0) {
      perror("ERROR writing to socket");
      exit(1);
   }
   
   /* Now read server response */
   bzero(buffer,256);
   n = read(sockfd, buffer, 255);
   
   if (n < 0) {
      perror("ERROR reading from socket");
      exit(1);
   }
	
   printf("%s\n",buffer);
   return 0;
}

Server

  • Create a socket with the socket() system call.

  • Bind the socket to an address using the bind() system call. For a server socket on the Internet, an address consists of a port number on the host machine.

  • Listen for connections with the listen() system call.

  • Accept a connection with the accept() system call. This call typically blocks until a client connects with the server.

  • Send and receive data using the read() and write() system calls.

#include <stdio.h>
#include <stdlib.h>

#include <netdb.h>
#include <netinet/in.h>

#include <string.h>

int main( int argc, char *argv[] ) {
   int sockfd, newsockfd, portno, clilen;
   char buffer[256];
   struct sockaddr_in serv_addr, cli_addr;
   int  n;
   
   /* First call to socket() function */
   sockfd = socket(AF_INET, SOCK_STREAM, 0);
   
   if (sockfd < 0) {
      perror("ERROR opening socket");
      exit(1);
   }
   
   /* Initialize socket structure */
   bzero((char *) &serv_addr, sizeof(serv_addr));
   portno = 5001;
   
   serv_addr.sin_family = AF_INET;
   serv_addr.sin_addr.s_addr = INADDR_ANY;
   serv_addr.sin_port = htons(portno);
   
   /* Now bind the host address using bind() call.*/
   if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
      perror("ERROR on binding");
      exit(1);
   }
      
   /* Now start listening for the clients, here process will
      * go in sleep mode and will wait for the incoming connection
   */
   
   listen(sockfd,5);
   clilen = sizeof(cli_addr);
   
   /* Accept actual connection from the client */
   newsockfd = accept(sockfd, (struct sockaddr *)&cli_addr, &clilen);
	
   if (newsockfd < 0) {
      perror("ERROR on accept");
      exit(1);
   }
   
   /* If connection is established then start communicating */
   bzero(buffer,256);
   n = read( newsockfd,buffer,255 );
   
   if (n < 0) {
      perror("ERROR reading from socket");
      exit(1);
   }
   
   printf("Here is the message: %s\n",buffer);
   
   /* Write a response to the client */
   n = write(newsockfd,"I got your message",18);
   
   if (n < 0) {
      perror("ERROR writing to socket");
      exit(1);
   }
      
   return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment