前言
这段时间看了《C++并发编程实战》的基础内容,想着利用最近学的知识自己实现一个简单的线程池。
什么是线程池
线程池(thread pool)是一种线程使用模式。线程过多或者频繁创建和销毁线程会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着管理器分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价,以及保证了线程的可复用性。线程池不仅能够保证内核的充分利用,还能防止过分调度。
思路
个人对线程池的理解是:利用已经创建的固定数量的线程去执行指定的任务,从而避免线程重复创建和销毁带来的额外开销。
C++11中,线程我们可以理解为对应一个thread对象,任务可以理解为要执行的函数,通常是耗时的函数。
我们的任务多少和顺序并非固定的,因此需要有一个方法能添加指定的任务,任务存放的地方应该是一个任务队列,因为我们的线程数量有限,当任务很多时同时执行的任务数量也有限,因此任务需要排队,遵循先来后到的原则。
当要执行一个任务时,意味着先将这个任务从队列取出,再执行相应任务,而“取出”动作的执行者是线程池中的线程,这意味我们的队列需要考虑多个线程在同一队列上执行“取出”操作的问题,实际上,取出任务操作和添加任务操作也不能同时进行,否则会产生竞争条件;另一方面,程序本身如果就是多线程的,多个线程同时添加任务的操作也应该是互斥的。
当没有任务可以执行时,所有线程应该什么也不做,当出现了一个任务时,应该将这个任务分配到任一线程中执行。实现上我们固然可以使用轮询的方式判断当前队列是否有任务,有则取出(即使加了互斥锁似乎也无法避免竞争条件?),但这样会消耗无谓的CPU资源,写轮询周期难以选取。其实,我们可以使用condition_variable代替轮询。
上述任务的创建和取出其实就是经典的生产者消费者模型。
我们将上面的内容都封装在一个类中,取名ThreadPool,用户可以在构造ThreadPool对象时指定线程池大小,之后可以随时添加要执行的任务。
实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
class ThreadPool { public : ThreadPool( int n); ~ThreadPool(); void pushTask(packaged_task< void ()> &&task); private : vector<thread*> threadPool; deque<packaged_task< void ()>> taskQueue; void taskConsumer(); mutex taskMutex; condition_variable taskQueueCond; }; ThreadPool::ThreadPool( int n) { for ( int i = 0 ; i < n; i++) { thread *t = new thread(&ThreadPool::taskConsumer, this ); threadPool.push_back(t); t->detach(); } } ThreadPool::~ThreadPool() { while (!threadPool.empty()) { thread *t=threadPool.back(); threadPool.pop_back(); delete t; } } void ThreadPool::pushTask(packaged_task< void ()> &&task) { { lock_guard<mutex> guard(taskMutex); taskQueue.push_back(std::move(task)); } taskQueueCond.notify_one(); } void ThreadPool::taskConsumer() { while ( true ) { unique_lock<mutex> lk(taskMutex); taskQueueCond.wait(lk, [&] { return !taskQueue.empty(); }); packaged_task< void ()> task=std::move(taskQueue.front()); taskQueue.pop_front(); lk.unlock(); task(); } } |
这里我使用packaged_task作为任务,每当添加一个任务,就调用condition_variable::notify_one方法,调用condition_variable::wait的线程就会被唤醒,并检查等待条件。这里有个小细节是notify_one在解锁后执行,这样避免线程唤醒后还要等待互斥锁解锁。
使用示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
void Task1() { Sleep( 1000 ); cout << "Task1" <<endl; } void Task5() { Sleep( 5000 ); cout << "Task5" << endl; } class Worker { public : void run(); }; void Worker::run() { cout << "Worker::run start" << endl; Sleep( 5000 ); cout << "Worker::run end" << endl; } int main() { ThreadPool pool( 2 ); pool.pushTask(packaged_task< void ()>(Task5)); pool.pushTask(packaged_task< void ()>(Task1)); pool.pushTask(packaged_task< void ()>(Task1)); Worker worker; pool.pushTask(packaged_task< void ()>(bind(&Worker::run,&worker))); pool.pushTask(packaged_task< void ()>([&](){worker.run();})); Sleep( 20000 ); } |
这个线程池目前有几个缺点:
- 只能传入调用形式为void()形式的函数或可调用对象,不能返回任务执行的值,只能通过其他方式同步任务执行结果(如果有)
- 传入参数较为复杂,必须封装一层packaged_task,调用对象方法时需要使用bind或者lambda表达式的方法封装
以上缺点在当前版本的实现不予解决,日后另写博文优化。
2021/12/29 更新之一:
事实上,我们只要将packaged_task改为funtion模板类,就可以简化我们的调用参数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
class ThreadPool { public : ThreadPool( int n); ~ThreadPool(); void pushTask(function< void ()> task); private : vector<thread*> threadPool; deque<function< void ()>> taskQueue; void taskConsumer(); mutex taskMutex; condition_variable taskQueueCond; }; ThreadPool::ThreadPool( int n) { for ( int i = 0 ; i < n; i++) { thread *t = new thread(&ThreadPool::taskConsumer, this ); threadPool.push_back(t); t->detach(); } } ThreadPool::~ThreadPool() { while (!threadPool.empty()) { thread *t=threadPool.back(); threadPool.pop_back(); delete t; } } void ThreadPool::pushTask(function< void ()> task) { { lock_guard<mutex> guard(taskMutex); taskQueue.push_back(std::move(task)); } taskQueueCond.notify_one(); } void ThreadPool::taskConsumer() { while ( true ) { unique_lock<mutex> lk(taskMutex); taskQueueCond.wait(lk, [&] { return !taskQueue.empty(); }); function< void ()> task=taskQueue.front(); taskQueue.pop_front(); lk.unlock(); task(); } } |
调用代码改为如下:
ThreadPool pool(2);
pool.pushTask(&Task5);
pool.pushTask(&Task1);
pool.pushTask(&Task1);
Worker worker;
pool.pushTask((bind(&Worker::run, &worker)));
pool.pushTask([&](){worker.run(); });//1
Sleep(15000);
我们可以执行指定的函数,也可以将要执行的代码放入lambda表达式的函数体中,正如1处所示,这样就能在其他线程中执行指定的代码了。
2021/12/29 更新之二:
我们发现,main最后都要调用sleep函数来避免主线程在线程任务完成之前就退出,因此我们希望添加一个接口,等待线程所有任务完成,改进如下,其他函数同前:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
class ThreadPool { public : ThreadPool( int n); ~ThreadPool(); void pushTask(function< void ()> task); void waitAllTask(); private : vector< thread *> threadPool; deque<function< void ()>> taskQueue; atomic< int > busyCount; bool bStop; void taskConsumer(); mutex taskQueueMutex; condition_variable taskQueueCond; condition_variable taskFinishedCond; }; void ThreadPool::taskConsumer() { while (!bStop) { unique_lock<mutex> lk(taskQueueMutex); taskQueueCond.wait(lk, [&] { return !taskQueue.empty(); }); busyCount++; function< void ()> task=taskQueue.front(); taskQueue.pop_front(); lk.unlock(); task(); busyCount--; taskFinishedCond.notify_one(); } } void ThreadPool::waitAllTask() { unique_lock<mutex> lk(taskQueueMutex); taskFinishedCond.wait(lk, [&] { return taskQueue.empty() && busyCount==0; }); //所有任务均已完成 } |
这样我们只要调用waitAllTask就可以等待所有任务完成啦。
到此这篇关于C++线程池实现代码的文章就介绍到这了,更多相关C++线程池内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!
原文链接:https://blog.csdn.net/mrbone11/article/details/122153149