C++提供了std::thread
对线程进行管理,但面对复杂的任务情况,往往有些捉襟见肘,我们希望可以对线程的整个生命周期进行管理,而不是放任其直到执行完成,理想情况下,能够将代码划分为更小的块,再通过并发执行。
另外,有时候我们还希望在线程启动后可以有方法“干预”其执行,比如某些情况下让其停止或者等待某个条件的达成等等。
这些更具体的要求使得我们不得不去使用更高级的线程管理方式,而不能仅仅使用C++提供的线程封装对象。
线程池
线程池的基本思想就是为了解决上述提到的第一个问题,主要的矛盾是并发任务数量和硬件线程数量有限之间的矛盾,最理想和最简单的方法当然是给每个并发任务一个线程,然后让它们在需要的时候执行即可,但硬件的线程数量总是有限的,而且也不是每个任务都是必要要同时执行的。
正如一个公司的职员需要出差,有的出差时间长,有的出差时间短,有的一年不一定出差一次,有的可能一个月要出差两三次,最好的方法是每个人都配辆车,但这个开销太大了也不现实,那么最经济实惠的方式就是公司分配几辆出差专用的车,谁需要出差就开走,出完差再换回来就行了。
线程池雏形
可以简单的实现一个线程池雏形:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| class ThreadPool { public: ThreadPool() { auto count = std::jthread::hardware_concurrency(); try { for(auto i = 0; i < count; i++) { m_threads.push_back(std::jthread(&ThreadPool::worker, this)); } } catch(std::exception& e) { m_done.store(true); throw; } }
~ThreadPool() { m_done.store(true); }
template<typename FunctionType> void submit(FunctionType f) { std::lock_guard<std::mutex> lg(m_mtx); m_work_queue.push(std::function<void()>(f)); }
private: void worker() { while(!m_done) { std::function<void()> task; std::unique_lock<std::mutex> ul(m_mtx); if(!m_work_queue.empty()) { task = m_work_queue.front(); m_work_queue.pop(); ul.unlock(); task(); } else { ul.unlock(); std::this_thread::yield(); } } }
private: std::atomic_bool m_done{ false }; std::mutex m_mtx{}; std::vector<std::jthread> m_threads{}; std::queue<std::function<void()>> m_work_queue{}; };
|
其中使用到了一个队列用来存储待执行的任务,通过submit
函数将任务加入队列中,然后工作线程会不断循环访问任务队列,如果队列中有任务待执行,就将任务取出执行。在加入任务和取出任务的时候存在数据竞争问题,可以通过加锁解决,也可以使用无锁队列来代替,这里图方便就使用了加锁的方式。
获取任务执行结果
这种简单的线程池还有些不足,在处理并发任务时,并不是将任务交给线程做完就可以了的,主线程还是至少需要获取任务执行的结果的,那么就需要添加一个等待获取任务执行结果的方式,我们可以通过future
和packaged_task
的方式来处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| class ThreadPool { public: ThreadPool() { auto count = std::jthread::hardware_concurrency(); try { for(auto i = 0; i < count; i++) { m_threads.push_back(std::jthread(&ThreadPool::worker, this)); } } catch(std::exception& e) { m_done.store(true); throw; } }
~ThreadPool() { m_done.store(true); }
template<typename FunctionType> std::future<typename std::result_of<FunctionType()>::type> submit(FunctionType f) { using ResultType = typename std::result_of<FunctionType()>::type; std::packaged_task<ResultType()> task(std::move(f)); std::future<ResultType> res(task.get_future()); { std::lock_guard<std::mutex> lg(m_mtx); m_work_queue.push(std::move(task)); } return res; }
private: void worker() { while(!m_done) { std::packaged_task<void()> task; std::unique_lock<std::mutex> ul(m_mtx); if(!m_work_queue.empty()) { task = std::move(m_work_queue.front()); m_work_queue.pop(); ul.unlock(); task(); } else { ul.unlock(); std::this_thread::yield(); } } }
private: std::atomic_bool m_done{ false }; std::mutex m_mtx{}; std::vector<std::jthread> m_threads{}; std::queue<std::packaged_task<void()>> m_work_queue{}; };
|
使用packaged_task
替换function
,packaged_task
包装的函数可以通过一个future
来获取执行结果,通过修改submit函数,将传入的函数通过packaged_task
后,使用一个future
对象关联其结果,并将这个future
对象返回给调用方。另外,packaged_task
对象是不支持拷贝的,只能进行移动。
避免任务竞争
上述的线程池都是通过submit来将任务加入线程池的任务队列中,当处理器较多时,每个处理器都需要去任务队列中获取任务,这会造成任务竞争,之前我们通过加锁的方式来避免竞争,会造成性能的消耗,如果采用无锁队列的形式,任务的获取会没有等待,但仍会有大量的乒乓缓存(读写分别有各自的缓冲区,一定时候后互相交换)要处理。
线程独立队列
更优的做法是为每个线程都建立一个独立的任务队列,每个线程都会将新任务放入自己的任务队列中,如果线程上的任务被消耗完了,再去全局的任务队列上去取。
下面使用一个thread_local
变量来保证每个线程都拥有一个自己的任务队列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
| class ThreadPool { public: ThreadPool() { auto count = std::jthread::hardware_concurrency(); try { for(auto i = 0; i < count; i++) { m_threads.push_back(std::jthread(&ThreadPool::worker, this)); } } catch(std::exception& e) { m_done.store(true); throw; } }
~ThreadPool() { m_done.store(true); }
template<typename FunctionType> std::future<typename std::result_of<FunctionType()>::type> submit(FunctionType f) { using ResultType = typename std::result_of<FunctionType()>::type; std::packaged_task<ResultType()> task(std::move(f)); std::future<ResultType> res(task.get_future()); if(m_local) { m_local->push(std::move(task)); } else { std::lock_guard<std::mutex> lg(m_mtx); m_work_queue.push(std::move(task)); } return res; }
private: void worker() { m_local.reset(new LocalThreadType); while(!m_done) { std::packaged_task<void()> task; if(m_local && !m_local->empty()) { task = std::move(m_local->front()); m_local->pop(); task(); } else { std::unique_lock<std::mutex> ul(m_mtx); if(!m_work_queue.empty()) { task = std::move(m_work_queue.front()); m_work_queue.pop(); ul.unlock(); task(); } else { ul.unlock(); std::this_thread::yield(); } } } }
private: std::atomic_bool m_done{ false }; std::mutex m_mtx{}; std::vector<std::jthread> m_threads{}; std::queue<std::packaged_task<void()>> m_work_queue{}; using LocalThreadType = decltype(m_work_queue); static thread_local std::unique_ptr<ThreadPool::LocalThreadType> m_local; };
std::unique_ptr<ThreadPool::LocalThreadType> ThreadPool::m_local{};
|
定义一个线程专属的任务队列,每个线程在执行时,首先检查自己本地的任务队列有没有任务需要执行,如果有就从本地任务队列中取出任务执行,否则再从线程池全局的任务队列中获取。因为本地任务队列是各个线程独立的,所以不用加锁,不会造成性能的大量损耗。
任务窃取
还有一个问题需要解决的是,如果线程任务队列的分配不均应该怎么办,比如有的线程上堆积了大量的任务,而其他线程的本地任务队列中又空空如也。这就需要线程可以有接口去访问其他线程的队列,并能够通过一些方法将其中的任务偷到自己的线程中执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
| class StealQueue { public: StealQueue() = default;
StealQueue(const StealQueue&) = delete;
StealQueue& operator=(const StealQueue&) = delete;
void push(std::packaged_task<void()> task) { std::lock_guard<std::mutex> lg(m_mtx); m_queue.push(std::move(task)); }
bool pop(std::packaged_task<void()>& task) { std::lock_guard<std::mutex> lg(m_mtx); if(m_queue.empty()) { return false; } task = std::move(m_queue.front()); m_queue.pop(); return true; }
bool empty() const { std::lock_guard<std::mutex> lg(m_mtx); return m_queue.empty(); }
bool steal(std::packaged_task<void()>& task) { std::lock_guard<std::mutex> lg(m_mtx); if(m_queue.empty()) { return false; } task = std::move(m_queue.front()); m_queue.pop(); return true; }
private: std::queue<std::packaged_task<void()>> m_queue{}; mutable std::mutex m_mtx; };
class ThreadPool { public: ThreadPool() { auto count = std::jthread::hardware_concurrency(); try { for(auto i = 0; i < count; i++) { queues.push_back(std::unique_ptr<StealQueue>(new StealQueue)); m_threads.push_back(std::jthread(&ThreadPool::worker, this)); } } catch(std::exception& e) { m_done.store(true); throw; } }
~ThreadPool() { m_done.store(true); }
template<typename FunctionType> std::future<typename std::result_of<FunctionType()>::type> submit(FunctionType f) { using ResultType = typename std::result_of<FunctionType()>::type; std::packaged_task<ResultType()> task(std::move(f)); std::future<ResultType> res(task.get_future()); if(m_local_queue) { m_local_queue->push(std::move(task)); } else { std::lock_guard<std::mutex> lg(m_mtx); m_work_queue.push(std::move(task)); } return res; }
private: void worker(unsigned index) { m_index = index; m_local_queue = queues.at(index).get(); while(!m_done) { std::packaged_task<void()> task; if(m_local_queue && !m_local_queue->empty()) { m_local_queue->pop(task); task(); } else { std::unique_lock<std::mutex> ul(m_mtx); if(!m_work_queue.empty()) { task = std::move(m_work_queue.front()); m_work_queue.pop(); ul.unlock(); task(); } else { ul.unlock(); unsigned i = 0; for(; i < queues.size(); i++) { auto index = (m_index + i + 1) % queues.size(); if(queues[index]->steal(task)) { break; } } if(i != queues.size()) { task(); } else { std::this_thread::yield(); } } } } }
private: std::atomic_bool m_done{ false }; std::mutex m_mtx{}; std::vector<std::jthread> m_threads{}; std::queue<std::packaged_task<void()>> m_work_queue{}; std::vector<std::unique_ptr<StealQueue>> queues{}; static thread_local StealQueue* m_local_queue; static thread_local unsigned m_index; };
StealQueue* ThreadPool::m_local_queue{ nullptr }; unsigned ThreadPool::m_index{ 0 };
|
将本地线程队列封装到一起,并允许通过一个接口来获取其他线程的任务,当线程的本地队列和全局队列中都没有任务时,就尝试从其他线程中偷取一个任务来执行。
中断
有时也需要一些方式去打断线程的执行,这可以通过C++标准库中提供的条件变量来实现。使用condition_variable
可以达到线程中中断等待的效果。
另外,还有一个condition_variable_any
通用条件变量可以使用,区别在于condition_variable
只能与std::mutex
搭配,而condition_variable_any
可以与任意的互斥量配合使用。