Last active
August 29, 2015 14:06
-
-
Save yohhoy/d305a6c5249c55ed89a3 to your computer and use it in GitHub Desktop.
MT-safe queue implementation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// mutex + busy-loop | |
#include <queue> | |
#include <thread> | |
#include <mutex> | |
class mt_queue { | |
static const int capacity = 10; | |
std::queue<int> q_; | |
std::mutex mtx_; | |
public: | |
void push(int data) | |
{ | |
std::unique_lock<std::mutex> lk(mtx_); | |
while (q_.size() == capacity) { | |
lk.unlock(); | |
std::this_thread::yield(); | |
lk.lock(); | |
} | |
q_.push(data); | |
} | |
int pop() | |
{ | |
std::unique_lock<std::mutex> lk(mtx_); | |
while (q_.empty()) { | |
lk.unlock(); | |
std::this_thread::yield(); | |
lk.lock(); | |
} | |
int data = q_.front(); | |
q_.pop(); | |
return data; | |
} | |
}; | |
//---------------------------------------------------------- | |
#include <iostream> | |
const int N = 100; | |
int main() | |
{ | |
mt_queue q; | |
std::thread th1([&]{ | |
for (int i = 1; i <= N; ++i) | |
q.push(i); | |
q.push(-1); // end of data | |
}); | |
std::thread th2([&]{ | |
int v; | |
while ((v = q.pop()) > 0) | |
std::cout << v << std::endl; | |
std::cout << "(EOD)" << std::endl; | |
}); | |
th1.join(); | |
th2.join(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// one CV + notify_all(broadcast) | |
#include <queue> | |
#include <thread> | |
#include <mutex> | |
#include <condition_variable> | |
class mt_queue { | |
static const int capacity = 10; | |
std::queue<int> q_; | |
std::mutex mtx_; | |
std::condition_variable cv_; | |
public: | |
void push(int data) | |
{ | |
std::unique_lock<std::mutex> lk(mtx_); | |
while (q_.size() == capacity) { | |
cv_.wait(lk); | |
} | |
q_.push(data); | |
cv_.notify_all(); | |
} | |
int pop() | |
{ | |
std::unique_lock<std::mutex> lk(mtx_); | |
while (q_.empty()) { | |
cv_.wait(lk); | |
} | |
int data = q_.front(); | |
q_.pop(); | |
cv_.notify_all(); | |
return data; | |
} | |
}; | |
//---------------------------------------------------------- | |
#include <iostream> | |
const int N = 100; | |
int main() | |
{ | |
mt_queue q; | |
std::thread th1([&]{ | |
for (int i = 1; i <= N; ++i) | |
q.push(i); | |
q.push(-1); // end of data | |
}); | |
std::thread th2([&]{ | |
int v; | |
while ((v = q.pop()) > 0) | |
std::cout << v << std::endl; | |
std::cout << "(EOD)" << std::endl; | |
}); | |
th1.join(); | |
th2.join(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// two CVs + notify_one(signal) | |
#include <queue> | |
#include <thread> | |
#include <mutex> | |
#include <condition_variable> | |
class mt_queue { | |
static const int capacity = 10; | |
std::queue<int> q_; | |
std::mutex mtx_; | |
std::condition_variable cv_nofull_; | |
std::condition_variable cv_noempty_; | |
public: | |
void push(int data) | |
{ | |
std::unique_lock<std::mutex> lk(mtx_); | |
cv_nofull_.wait(lk, [&]{ | |
return (q_.size() < capacity); | |
}); | |
bool do_signal = q_.empty(); | |
q_.push(data); | |
if (do_signal) | |
cv_noempty_.notify_one(); | |
} | |
int pop() | |
{ | |
std::unique_lock<std::mutex> lk(mtx_); | |
cv_noempty_.wait(lk, [&]{ | |
return !q_.empty(); | |
}); | |
bool do_signal = (q_.size() == capacity); | |
int data = q_.front(); | |
q_.pop(); | |
if (do_signal) | |
cv_nofull_.notify_one(); | |
return data; | |
} | |
}; | |
//---------------------------------------------------------- | |
#include <iostream> | |
const int N = 100; | |
int main() | |
{ | |
mt_queue q; | |
std::thread th1([&]{ | |
for (int i = 1; i <= N; ++i) | |
q.push(i); | |
q.push(-1); // end of data | |
}); | |
std::thread th2([&]{ | |
int v; | |
while ((v = q.pop()) > 0) | |
std::cout << v << std::endl; | |
std::cout << "(EOD)" << std::endl; | |
}); | |
th1.join(); | |
th2.join(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// push/pop + close + abort | |
#include <queue> | |
#include <exception> | |
#include <thread> | |
#include <mutex> | |
#include <condition_variable> | |
struct closed_queue : std::exception {}; | |
struct abort_exception : std::exception {}; | |
class mt_queue { | |
static const int capacity = 10; | |
std::queue<int> q_; | |
std::mutex mtx_; | |
std::condition_variable cv_nofull_; | |
std::condition_variable cv_noempty_; | |
bool closed_ = false; | |
bool aborted_ = false; | |
public: | |
void push(int data) | |
{ | |
std::unique_lock<std::mutex> lk(mtx_); | |
cv_nofull_.wait(lk, [&]{ | |
return (q_.size() < capacity) || closed_ || aborted_; | |
}); | |
if (closed_) | |
throw closed_queue(); | |
if (aborted_) | |
throw abort_exception(); | |
bool do_signal = q_.empty(); | |
q_.push(data); | |
if (do_signal) | |
cv_noempty_.notify_one(); | |
} | |
bool pop(int& data) | |
{ | |
std::unique_lock<std::mutex> lk(mtx_); | |
cv_noempty_.wait(lk, [&]{ | |
return !q_.empty() || (q_.empty() && closed_) || aborted_; | |
}); | |
if (q_.empty() && closed_) | |
return false; // closed queue | |
if (aborted_) | |
throw abort_exception(); | |
bool do_signal = (q_.size() == capacity); | |
data = q_.front(); | |
q_.pop(); | |
if (do_signal) | |
cv_nofull_.notify_one(); | |
return true; | |
} | |
void close() | |
{ | |
std::lock_guard<std::mutex> lk(mtx_); | |
closed_ = true; | |
cv_nofull_.notify_all(); | |
cv_noempty_.notify_all(); | |
} | |
void abort() | |
{ | |
std::lock_guard<std::mutex> lk(mtx_); | |
aborted_ = true; | |
cv_nofull_.notify_all(); | |
cv_noempty_.notify_all(); | |
} | |
}; | |
//---------------------------------------------------------- | |
#include <iostream> | |
const int N = 100; | |
int main() | |
{ | |
mt_queue q; | |
std::thread th1([&]{ | |
try { | |
for (int i = 1; i <= N; ++i) { | |
q.push(i); | |
std::this_thread::sleep_for(std::chrono::milliseconds(10)); | |
} | |
q.close(); // end of data | |
} catch (closed_queue&) { | |
std::cout << "closed queue" << std::endl; | |
} catch (abort_exception&) { | |
std::cout << "abort producer" << std::endl; | |
} | |
}); | |
std::thread th2([&]{ | |
try { | |
int v; | |
while (q.pop(v)) { | |
std::cout << v << std::endl; | |
} | |
std::cout << "(EOD)" << std::endl; | |
} catch (abort_exception&) { | |
std::cout << "abort consumer" << std::endl; | |
} | |
}); | |
std::this_thread::sleep_for(std::chrono::milliseconds(100)); | |
// q.close(); | |
q.abort(); | |
th1.join(); | |
th2.join(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
http://yohhoy.hatenablog.jp/entry/2014/09/23/193617 (ja)
Source code distributed under Boost Software License 1.0.