Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save kingsamchen/372e7d03fdb9cfbf81c5 to your computer and use it in GitHub Desktop.
Save kingsamchen/372e7d03fdb9cfbf81c5 to your computer and use it in GitHub Desktop.
#include <tchar.h>
#include <algorithm>
#include <cstdlib>
#include <iostream>
#include <queue>
#include <string>
#include <process.h>
#include <Windows.h>
CRITICAL_SECTION buffer_lock;
CONDITION_VARIABLE buffer_not_full;
CONDITION_VARIABLE buffer_not_empty;
std::queue<int> buffer;
long stop_requested = FALSE;
size_t total_item_produced = 0U;
size_t total_item_consumed = 0U;
const int kProducerSleepTimeMS = 500;
const int kConsumerSleepTimeMS = 2000;
const int kMaximumQueueSize = 10;
unsigned int WINAPI Producer(void* param)
{
unsigned int id = reinterpret_cast<unsigned int>(param);
while (true) {
Sleep(rand() % kProducerSleepTimeMS);
int item = rand() % 1000;
EnterCriticalSection(&buffer_lock);
while (buffer.size() == kMaximumQueueSize && !stop_requested) {
SleepConditionVariableCS(&buffer_not_full, &buffer_lock, INFINITE);
}
if (stop_requested) {
LeaveCriticalSection(&buffer_lock);
break;
}
buffer.push(item);
++total_item_produced;
printf_s("producer %u: item %d, queue size: %u\n", id, item, buffer.size());
LeaveCriticalSection(&buffer_lock);
WakeConditionVariable(&buffer_not_empty);
}
printf_s("producer %u exiting\n", id);
return 0;
}
unsigned int WINAPI Consumer(void* param)
{
unsigned int id = reinterpret_cast<unsigned int>(param);
while (true) {
EnterCriticalSection(&buffer_lock);
while (buffer.empty() && !stop_requested) {
SleepConditionVariableCS(&buffer_not_empty, &buffer_lock, INFINITE);
}
if (stop_requested) {
LeaveCriticalSection(&buffer_lock);
break;
}
int item = buffer.front();
buffer.pop();
++total_item_consumed;
printf_s("Consumer %u: item %d, queue size: %u\n", id, item, buffer.size());
LeaveCriticalSection(&buffer_lock);
WakeConditionVariable(&buffer_not_full);
Sleep(rand() % kConsumerSleepTimeMS);
}
printf_s("consumer %u exiting\n", id);
return 0;
}
int _tmain(int argc, _TCHAR* argv[])
{
InitializeCriticalSection(&buffer_lock);
InitializeConditionVariable(&buffer_not_full);
InitializeConditionVariable(&buffer_not_empty);
HANDLE threads[3] = {};
threads[0] = reinterpret_cast<HANDLE>(
_beginthreadex(nullptr, 0, Producer, reinterpret_cast<void*>(1), 0, nullptr));
threads[1] = reinterpret_cast<HANDLE>(
_beginthreadex(nullptr, 0, Consumer, reinterpret_cast<void*>(1), 0, nullptr));
threads[2] = reinterpret_cast<HANDLE>(
_beginthreadex(nullptr, 0, Consumer, reinterpret_cast<void*>(2), 0, nullptr));
std::cout << "Press ENTER to stop..." << std::endl;
getchar();
InterlockedExchange(&stop_requested, TRUE);
WakeAllConditionVariable(&buffer_not_empty);
WakeAllConditionVariable(&buffer_not_full);
WaitForMultipleObjects(_countof(threads), threads, TRUE, INFINITE);
std::for_each(std::begin(threads), std::end(threads), CloseHandle);
DeleteCriticalSection(&buffer_lock);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment