diff --git a/cpp/src/threadpool.hpp b/cpp/src/threadpool.hpp new file mode 100644 index 0000000..3985602 --- /dev/null +++ b/cpp/src/threadpool.hpp @@ -0,0 +1,73 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +class ThreadPool { + public: + ThreadPool(size_t numThreads) { + for (size_t i = 0; i < numThreads; ++i) { + workers.emplace_back([this] { + while (true) { + std::function task; + { + std::unique_lock lock(queueMutex); + condition.wait(lock, [this] { + return stop || !tasks.empty(); + }); + if (stop && tasks.empty()) + return; + task = std::move(tasks.front()); + tasks.pop(); + } + task(); + } + }); + } + } + + template + auto enqueue(F&& f, Args&&... args) + -> std::future::type> { + using return_type = typename std::invoke_result::type; + + auto task = std::make_shared>( + std::bind(std::forward(f), std::forward(args)...) + ); + + std::future res = task->get_future(); + { + std::unique_lock lock(queueMutex); + tasks.emplace([task]() { (*task)(); }); + } + condition.notify_one(); + return res; + } + + void waitAll() { + std::unique_lock lock(queueMutex); + condition.wait(lock, [this] { return tasks.empty(); }); + } + + ~ThreadPool() { + { + std::unique_lock lock(queueMutex); + stop = true; + } + condition.notify_all(); + for (std::thread& worker : workers) + worker.join(); + } + + private: + std::vector workers; + std::queue> tasks; + std::mutex queueMutex; + std::condition_variable condition; + bool stop = false; +};