Skip to content

Instantly share code, notes, and snippets.

@eao197
Created April 26, 2024 13:08
Show Gist options
  • Save eao197/0a7a91e21480a53351d91c6785760819 to your computer and use it in GitHub Desktop.
Save eao197/0a7a91e21480a53351d91c6785760819 to your computer and use it in GitHub Desktop.
Возможные реализации thread_pool-ов для задачи рекурсивного обхода подкаталогов
#include <condition_variable>
#include <functional>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
namespace demo
{
class thread_pool_t
{
public:
// Тип одной заявки на исполнение.
using task_t = std::function< void() >;
private:
// Тип очереди заявок.
using task_queue_t = std::queue< task_t >;
// Статус пула.
enum class status_t
{
// Работа еще не начиналась, ждем самую первую заявку.
not_started,
// Работа начата, но сигнала на принудительное завершение
// еще не было.
in_progress,
// Работа должна быть завершена принудительно.
shutdown
};
// Тип для попытки извлечения очередной заявки из очереди.
struct task_extraction_result_t
{
// Заявка для обработки.
//
// Содержит актуальное значение только если m_should_process == true;
task_t m_task;
// Если true, то заявку нужно обрабатывать и в m_task лежит
// актуальная заявка.
bool m_should_process;
};
// Размер thread-pool-а.
const std::size_t m_pool_size;
// Замок для всего thread-pool-а.
std::mutex m_pool_lock;
// Рабочие нити пула.
std::vector< std::thread > m_threads;
// Очередь заявок.
task_queue_t m_queue;
// Условие для ожидания появления заявок в очереди.
//
// В том числе выставляется и при смене статуса на shutdown.
std::condition_variable m_queue_not_empty;
// Текущий статус пула.
status_t m_status{ status_t::not_started };
// Признак того, что пул завершил свою работу.
bool m_completed{ false };
// Условие для ожидания завершения работы пула.
//
// Взводится когда m_completed принимает true.
std::condition_variable m_completion_confirmed;
// Сколько нитей реально стартовало.
//
// Этот счетчик нужен для случая возникновения исключений в методе start().
std::size_t m_actually_started{};
// Сколько нитей сейчас реально свободно.
//
// Это если работа была начата, все нити свободны, а очередь заявок пуста,
// то работу можно завершать.
std::size_t m_actually_free{};
// Сколько нитей завершило свою работу.
//
// Этот счетчик нужен для определения момента, когда можно выставлять m_completed.
std::size_t m_actually_completed{};
public:
// Инициализирующий конструктор.
//
// ПРИМЕЧАНИЕ: ожидаем, что pool_size больше нуля, но не проверяем это
// специально.
thread_pool_t( std::size_t pool_size )
: m_pool_size{ pool_size }
{
}
~thread_pool_t()
{
// Принудительно завершаем работу тех нитей, которые могли
// быть запущены.
{
std::lock_guard lock{ m_pool_lock };
m_status = status_t::shutdown;
m_queue_not_empty.notify_all();
}
for( auto & t : m_threads )
// Если нить запущена, то значит она joinable, нет смысла
// проверять это отдельно.
t.join();
}
// Запустить рабочие нити пула.
//
// ПРИМЕЧАНИЕ: может вызываться только один раз.
void
start()
{
// Запускаем рабочие нити при заблокированном объекте, чтобы
// проще было контролировать сколько запустилось, сколько завершило
// свою работу.
std::lock_guard lock{ m_pool_lock };
// ПРИМЕЧАНИЕ: не заботимся об исключениях из-за невозможности
// запустить очередную нить. Если i-я нить не стартует, то все
// ранее запущенные нити будут остановлены в деструкторе.
m_threads.reserve( m_pool_size );
for( std::size_t i = 0; i < m_pool_size; ++i )
{
m_threads.push_back( std::thread{ [this]() { body(); } } );
++m_actually_started;
}
}
// Добавить еще одну заявку в очередь.
void
push( task_t task )
{
std::lock_guard lock{ m_pool_lock };
// Дергать condition_variable имеет смысл только если очередь была пуста.
const bool was_empty = m_queue.empty();
m_queue.push( std::move(task) );
if( was_empty )
m_queue_not_empty.notify_one();
}
// Ожидать завершения работы.
void
wait_for_completion()
{
std::unique_lock lock{ m_pool_lock };
if( !m_completed )
m_completion_confirmed.wait( lock, [this]{ return m_completed; } );
}
private:
// Основная функция рабочей нити.
void
body()
{
// Для основного цикла нужно обеспечить no exception гарантию, т.к.
// даже если там исключение выскочит, то нам все равно нужно
// вызвать decrement_started_count.
try
{
do_main_loop();
}
catch(...)
{
// Просто все перехватываем.
// Ничего не логируем, т.к. при логировании так же могут
// возникнуть исключения.
}
handle_thread_completion();
}
void
do_main_loop()
{
bool should_continue{ true };
do
{
const auto extraction_result = try_extract_next_task();
should_continue = extraction_result.m_should_process;
if( should_continue )
{
extraction_result.m_task();
}
}
while( should_continue );
}
// ПРИМЕЧАНИЕ: noexcept потому что нет шансов востановиться при
// возникновении исключения.
void
handle_thread_completion() noexcept
{
std::lock_guard lock{ m_pool_lock };
++m_actually_completed;
if( m_actually_started == m_actually_completed )
{
// Пул завершил свою работу. Если этого момента кто-то ждал,
// то нужно об этом сообщить.
m_completed = true;
m_completion_confirmed.notify_one();
}
}
// Попробовать достать заявку из очереди.
//
// Если task_extraction_result_t::m_should_process равен false,
// то цикл обработки заявок нужно прекратить.
[[nodiscard]] task_extraction_result_t
try_extract_next_task()
{
// Повторяем попытки достать заявку из очереди до тех пор,
// пока заявка в очереди появится, либо пока будет выставлен
// признак на завершение работы.
bool should_continue = true;
// Все действия внутри цикла должны выполняться при захваченном
// замке объекта.
std::unique_lock lock{ m_pool_lock };
do
{
// Одной свободной нитью стало больше.
++m_actually_free;
// Если работа уже началась, но очередь пуста и все рабочие
// нити свободны, то значит новых заявок уже не будет, а значит
// и работу нужно завершать.
if( m_queue.empty() && status_t::in_progress == m_status
&& m_actually_started == m_actually_free )
{
m_status = status_t::shutdown;
m_queue_not_empty.notify_all();
}
should_continue = (status_t::shutdown != m_status);
if( should_continue )
{
if( m_queue.empty() )
{
// Придется дать, пока в очереди что-то появится.
// Или если статус поменяется.
m_queue_not_empty.wait( lock,
[this] {
return !m_queue.empty() ||
status_t::shutdown == m_status;
} );
}
else
{
task_extraction_result_t ret_value{ std::move(m_queue.front()), true };
m_queue.pop();
// Одной свободной нитью стало меньше.
--m_actually_free;
// Если это самая первая заявка, то нужно указать,
// что работа началась.
if( status_t::not_started == m_status )
m_status = status_t::in_progress;
return ret_value;
}
}
}
while( should_continue );
// Раз оказались здесь, значит работу нужно завершать.
return { {}, false };
}
};
} /* namespace demo */
using namespace demo;
void
test_task( thread_pool_t & pool, int remaining )
{
if( remaining )
{
pool.push( [&pool, r = remaining - 1]() {
test_task( pool, r );
} );
}
std::this_thread::sleep_for( std::chrono::milliseconds{20} );
std::cout << "test_task: " << remaining << std::endl;
}
int
main()
{
{
thread_pool_t pool{ 4 };
pool.start();
pool.push( [&pool]() {
test_task( pool, 10 );
} );
pool.wait_for_completion();
}
std::cout << "Completed" << std::endl;
}
#include <atomic>
#include <condition_variable>
#include <functional>
#include <future>
#include <iostream>
#include <mutex>
#include <optional>
#include <queue>
#include <thread>
#include <vector>
#include <utility>
namespace demo
{
class thread_pool_t
{
public:
// Тип задачи, которая будет отдаваться в качестве заявки.
using task_t = std::function< void() >;
private:
// Тип, который хранит информацию, необходимую для завершения работы.
struct work_completion_data_t
{
// Сколько еще активных заявок.
//
// ПРИМЕЧАНИЕ: в начале работы содержит нулевое значение.
std::atomic< unsigned int > m_live_demands{};
// Промиз, который нужно выставить, чтобы дать сигнал
// о том, что работа была завершена.
std::promise< void > m_completion_promise;
};
// Тип одной заявки.
//
// ПРИМЕЧАНИЕ: это Moveable, но не Copyable тип.
struct demand_t
{
// Информация для завершения работы.
//
// Нулевое значение указывает, что содержимое объекта
// перемещено и сам объект активным уже не является.
work_completion_data_t * m_work_completion;
// Задача для выполнения.
//
// Может быть пустым объектом, если содержимое demand_t
// было перемещено.
task_t m_task;
friend void
swap( demand_t & a, demand_t & b ) noexcept
{
using std::swap;
swap( a.m_work_completion, b.m_work_completion );
swap( a.m_task, b.m_task );
}
demand_t(
work_completion_data_t & work_completion,
task_t task)
: m_work_completion{ std::addressof(work_completion) }
, m_task{ std::move(task) }
{
// Нужно указать, что активных заявок стало больше.
++(m_work_completion->m_live_demands);
}
~demand_t()
{
// Если содержимое объекта не было куда-то перемещено,
// то это живой объект, который должен сообщить, что
// живых объектов стало меньше.
if( m_work_completion )
{
if( 0u == --(m_work_completion->m_live_demands) )
{
// Живых заявок больше нет, можно завершать работу.
m_work_completion->m_completion_promise.set_value();
}
}
}
// Копирования быть не должно.
demand_t( const demand_t & ) = delete;
// А перемещение должно быть.
demand_t( demand_t && o ) noexcept
: m_work_completion{
std::exchange( o.m_work_completion, nullptr ) }
, m_task{ std::exchange( o.m_task, task_t{} ) }
{}
// Копирования быть не должно.
demand_t &
operator=( const demand_t & ) = delete;
// А перемещение делаем через "make temporary then swap".
demand_t &
operator=( demand_t && o ) noexcept
{
demand_t tmp{ std::move(o) };
swap( tmp, *this );
return *this;
}
};
// Тип очереди заявок.
using demand_queue_t = std::queue< demand_t >;
// Размер thread-pool-а.
const std::size_t m_pool_size;
// Информация для завершения работы.
work_completion_data_t m_work_completion;
// Замок для всего thread-pool-а.
std::mutex m_pool_lock;
// Рабочие нити пула.
std::vector< std::thread > m_threads;
// Очередь заявок.
demand_queue_t m_demands;
// Условие для ожидания появления заявок в очереди.
//
// В том числе выставляется и при смене статуса на shutdown.
std::condition_variable m_queue_not_empty;
// Признак того, что пул должен завершить свою работу.
bool m_shutdown_initiated{ false };
public:
// Инициализирующий конструктор.
//
// ПРИМЕЧАНИЕ: ожидаем, что pool_size больше нуля, но не проверяем это
// специально.
thread_pool_t( std::size_t pool_size )
: m_pool_size{ pool_size }
{
}
~thread_pool_t()
{
// Принудительно завершаем работу тех нитей, которые могли
// быть запущены.
{
std::lock_guard lock{ m_pool_lock };
m_shutdown_initiated = true;
m_queue_not_empty.notify_all();
}
for( auto & t : m_threads )
// Если нить запущена, то значит она joinable, нет смысла
// проверять это отдельно.
t.join();
}
// Запустить рабочие нити пула.
//
// ПРИМЕЧАНИЕ: может вызываться только один раз.
void
start()
{
// Запускаем рабочие нити при заблокированном объекте, чтобы
// проще было контролировать сколько запустилось, сколько завершило
// свою работу.
std::lock_guard lock{ m_pool_lock };
// ПРИМЕЧАНИЕ: не заботимся об исключениях из-за невозможности
// запустить очередную нить. Если i-я нить не стартует, то все
// ранее запущенные нити будут остановлены в деструкторе.
m_threads.reserve( m_pool_size );
for( std::size_t i = 0; i < m_pool_size; ++i )
{
m_threads.push_back( std::thread{ [this]() { body(); } } );
}
}
// Добавить еще одну заявку в очередь.
void
push( task_t task )
{
std::lock_guard lock{ m_pool_lock };
// Дергать condition_variable имеет смысл только если очередь была пуста.
const bool was_empty = m_demands.empty();
m_demands.push( demand_t{ m_work_completion, std::move(task) } );
if( was_empty )
m_queue_not_empty.notify_one();
}
// Ожидать завершения работы.
void
wait_for_completion()
{
m_work_completion.m_completion_promise.get_future().wait();
}
private:
// Основная функция рабочей нити.
void
body()
{
// Не заморачиваемся на исключения. Если вылетит из do_main_loop,
// то все приложение будет убито.
do_main_loop();
}
void
do_main_loop()
{
// Повторяем попытки достать заявку из очереди до тех пор,
// пока заявка в очереди появится, либо пока будет выставлен
// признак на завершение работы.
bool should_continue = true;
do
{
const auto opt_demand = try_extract_demand();
if( opt_demand )
opt_demand->m_task();
else
// Нет заявок, значит работа закончена.
should_continue = false;
}
while( should_continue );
}
// Возвращает пустой optional если нужно завершить работу.
[[nodiscard]] std::optional< demand_t >
try_extract_demand()
{
std::optional< demand_t > result;
// Пытаемся извлечь заявку при захваченном замке объекта.
std::unique_lock lock{ m_pool_lock };
if( !m_shutdown_initiated )
{
if( m_demands.empty() )
// Придется поспать.
m_queue_not_empty.wait( lock,
[this] {
return !m_demands.empty() || m_shutdown_initiated;
} );
// Еще раз проверим m_shutdown_initiated, т.к. он мог измениться
// пока мы спали.
if( !m_shutdown_initiated )
{
result = std::move(m_demands.front());
m_demands.pop();
}
}
return result;
}
};
} /* namespace demo */
using namespace demo;
void
test_task( thread_pool_t & pool, int remaining )
{
if( remaining )
{
pool.push( [&pool, r = remaining - 1]() {
test_task( pool, r );
} );
}
std::this_thread::sleep_for( std::chrono::milliseconds{20} );
std::cout << "test_task: " << remaining << std::endl;
}
int
main()
{
{
thread_pool_t pool{ 4 };
pool.start();
pool.push( [&pool]() {
test_task( pool, 10 );
} );
// Дожидаемся пока работа будет завершена.
pool.wait_for_completion();
// При выходе thread_pool остановит сам себя в деструкторе.
}
std::cout << "Completed" << std::endl;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment