Skip to content

Instantly share code, notes, and snippets.

@loggerhead
Last active October 11, 2020 05:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save loggerhead/7fcaa027f2206ea55767226d5be480e6 to your computer and use it in GitHub Desktop.
Save loggerhead/7fcaa027f2206ea55767226d5be480e6 to your computer and use it in GitHub Desktop.
producer-consumer
#include <atomic>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
using namespace std;
const int nproducer = 4;
const int nconsumer = 5;
const int nmsg = 100;
mutex mx;
condition_variable cv;
queue<int> q;
bool finished = false;
atomic_int msg(0);
void producer() {
for (int i = 0; i < nmsg / nproducer; ++i) {
{
lock_guard<mutex> lk(mx);
q.push(msg++);
}
cv.notify_all();
}
{
lock_guard<mutex> lk(mx);
finished = true;
}
cv.notify_all();
}
void consumer() {
while (true) {
unique_lock<mutex> lk(mx);
cv.wait(lk, [] { return finished || !q.empty(); });
while (!q.empty()) {
cout << "hi " << q.front() << endl;
q.pop();
}
if (finished) break;
}
}
int main() {
vector<thread> tt;
// create producers and consumers
for (int i = 0; i < nproducer; i++) {
thread t(producer);
tt.push_back(move(t));
}
for (int i = 0; i < nconsumer; i++) {
thread t(consumer);
tt.push_back(move(t));
}
// wait all threads finish
for (auto& t: tt) {
t.join();
}
cout << "finished!" << endl;
}
package main
import (
"fmt"
"time"
)
const nproducer = 1
const nconsumer = 5
const nmsg = 10
// QPS
const nlimit = 10
func producer(done chan<- bool, msg chan<- string) {
for i := 0; i < nmsg; i++ {
msg <- fmt.Sprintf("hi %v", i)
}
done <- true
}
func consumer(done chan<- bool, msg <-chan string, limiter <-chan time.Time) {
for s := range msg {
<-limiter
fmt.Printf("%v\n", s)
// simulate consuming stuck
time.Sleep(1 * time.Second)
}
done <- true
}
func main() {
pdone := make(chan bool)
cdone := make(chan bool)
msg := make(chan string, nmsg)
limiter := make(chan time.Time, 100)
go func() {
timer := time.Tick(1000 * time.Millisecond / nlimit)
for t := range timer {
limiter <- t
}
}()
for i := 0; i < nproducer; i++ {
go producer(pdone, msg)
}
for i := 0; i < nconsumer; i++ {
go consumer(cdone, msg, limiter)
}
for i := 0; i < nproducer; i++ {
<-pdone
}
close(msg)
fmt.Printf("all producers closed!\n")
for i := 0; i < nconsumer; i++ {
<-cdone
}
fmt.Printf("all consumers closed!\n")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment