Last active
October 11, 2020 05:25
-
-
Save loggerhead/7fcaa027f2206ea55767226d5be480e6 to your computer and use it in GitHub Desktop.
producer-consumer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <atomic> | |
#include <condition_variable> | |
#include <iostream> | |
#include <mutex> | |
#include <queue> | |
#include <thread> | |
using namespace std; | |
const int nproducer = 4; | |
const int nconsumer = 5; | |
const int nmsg = 100; | |
mutex mx; | |
condition_variable cv; | |
queue<int> q; | |
bool finished = false; | |
atomic_int msg(0); | |
void producer() { | |
for (int i = 0; i < nmsg / nproducer; ++i) { | |
{ | |
lock_guard<mutex> lk(mx); | |
q.push(msg++); | |
} | |
cv.notify_all(); | |
} | |
{ | |
lock_guard<mutex> lk(mx); | |
finished = true; | |
} | |
cv.notify_all(); | |
} | |
void consumer() { | |
while (true) { | |
unique_lock<mutex> lk(mx); | |
cv.wait(lk, [] { return finished || !q.empty(); }); | |
while (!q.empty()) { | |
cout << "hi " << q.front() << endl; | |
q.pop(); | |
} | |
if (finished) break; | |
} | |
} | |
int main() { | |
vector<thread> tt; | |
// create producers and consumers | |
for (int i = 0; i < nproducer; i++) { | |
thread t(producer); | |
tt.push_back(move(t)); | |
} | |
for (int i = 0; i < nconsumer; i++) { | |
thread t(consumer); | |
tt.push_back(move(t)); | |
} | |
// wait all threads finish | |
for (auto& t: tt) { | |
t.join(); | |
} | |
cout << "finished!" << endl; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"time" | |
) | |
const nproducer = 1 | |
const nconsumer = 5 | |
const nmsg = 10 | |
// QPS | |
const nlimit = 10 | |
func producer(done chan<- bool, msg chan<- string) { | |
for i := 0; i < nmsg; i++ { | |
msg <- fmt.Sprintf("hi %v", i) | |
} | |
done <- true | |
} | |
func consumer(done chan<- bool, msg <-chan string, limiter <-chan time.Time) { | |
for s := range msg { | |
<-limiter | |
fmt.Printf("%v\n", s) | |
// simulate consuming stuck | |
time.Sleep(1 * time.Second) | |
} | |
done <- true | |
} | |
func main() { | |
pdone := make(chan bool) | |
cdone := make(chan bool) | |
msg := make(chan string, nmsg) | |
limiter := make(chan time.Time, 100) | |
go func() { | |
timer := time.Tick(1000 * time.Millisecond / nlimit) | |
for t := range timer { | |
limiter <- t | |
} | |
}() | |
for i := 0; i < nproducer; i++ { | |
go producer(pdone, msg) | |
} | |
for i := 0; i < nconsumer; i++ { | |
go consumer(cdone, msg, limiter) | |
} | |
for i := 0; i < nproducer; i++ { | |
<-pdone | |
} | |
close(msg) | |
fmt.Printf("all producers closed!\n") | |
for i := 0; i < nconsumer; i++ { | |
<-cdone | |
} | |
fmt.Printf("all consumers closed!\n") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment