You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

113 lines
3.2 KiB

  1. // Copyright (c) 2012 Jakob Progsch, Václav Zeman
  2. // This software is provided 'as-is', without any express or implied
  3. // warranty. In no event will the authors be held liable for any damages
  4. // arising from the use of this software.
  5. // Permission is granted to anyone to use this software for any purpose,
  6. // including commercial applications, and to alter it and redistribute it
  7. // freely, subject to the following restrictions:
  8. // 1. The origin of this software must not be misrepresented; you must not
  9. // claim that you wrote the original software. If you use this software
  10. // in a product, an acknowledgment in the product documentation would be
  11. // appreciated but is not required.
  12. // 2. Altered source versions must be plainly marked as such, and must not be
  13. // misrepresented as being the original software.
  14. // 3. This notice may not be removed or altered from any source
  15. // distribution.
  16. #ifndef UTILS_THREAD_POOL_H_
  17. #define UTILS_THREAD_POOL_H_
  18. #include <condition_variable>
  19. #include <functional>
  20. #include <future>
  21. #include <memory>
  22. #include <mutex>
  23. #include <queue>
  24. #include <stdexcept>
  25. #include <thread>
  26. #include <utility>
  27. #include <vector>
  28. class ThreadPool {
  29. public:
  30. explicit ThreadPool(size_t);
  31. template <class F, class... Args>
  32. auto enqueue(F&& f, Args&&... args)
  33. -> std::future<typename std::result_of<F(Args...)>::type>;
  34. ~ThreadPool();
  35. private:
  36. // need to keep track of threads so we can join them
  37. std::vector<std::thread> workers;
  38. // the task queue
  39. std::queue<std::function<void()> > tasks;
  40. // synchronization
  41. std::mutex queue_mutex;
  42. std::condition_variable condition;
  43. bool stop;
  44. };
  45. // the constructor just launches some amount of workers
  46. inline ThreadPool::ThreadPool(size_t threads) : stop(false) {
  47. for (size_t i = 0; i < threads; ++i)
  48. workers.emplace_back([this] {
  49. for (;;) {
  50. std::function<void()> task;
  51. {
  52. std::unique_lock<std::mutex> lock(this->queue_mutex);
  53. this->condition.wait(
  54. lock, [this] { return this->stop || !this->tasks.empty(); });
  55. if (this->stop && this->tasks.empty()) return;
  56. task = std::move(this->tasks.front());
  57. this->tasks.pop();
  58. }
  59. task();
  60. }
  61. });
  62. }
  63. // add new work item to the pool
  64. template <class F, class... Args>
  65. auto ThreadPool::enqueue(F&& f, Args&&... args)
  66. -> std::future<typename std::result_of<F(Args...)>::type> {
  67. using return_type = typename std::result_of<F(Args...)>::type;
  68. auto task = std::make_shared<std::packaged_task<return_type()> >(
  69. std::bind(std::forward<F>(f), std::forward<Args>(args)...));
  70. std::future<return_type> res = task->get_future();
  71. {
  72. std::unique_lock<std::mutex> lock(queue_mutex);
  73. // don't allow enqueueing after stopping the pool
  74. if (stop) {
  75. throw std::runtime_error("enqueue on stopped ThreadPool");
  76. }
  77. tasks.emplace([task]() { (*task)(); });
  78. }
  79. condition.notify_one();
  80. return res;
  81. }
  82. // the destructor joins all threads
  83. inline ThreadPool::~ThreadPool() {
  84. {
  85. std::unique_lock<std::mutex> lock(queue_mutex);
  86. stop = true;
  87. }
  88. condition.notify_all();
  89. for (std::thread& worker : workers) {
  90. worker.join();
  91. }
  92. }
  93. #endif // UTILS_THREAD_POOL_H_