线程池C++版本

C++版本的线程池,相较于之前写的C版本的线程池,简洁许多,因为对于任务队列,C++中有容器可以使用,不用自己设计队列,并且对队列的节点进行增删擦偶哦;此外C++中有析构函数,在程序结束之后,会自动执行析构函数,可以将资源回收等操作放到析构函数中

一、线程池C++实现

1.头文件

pthreadpool.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#pragma once  // 防止同一个文件被多次包含

#include <thread>
#include <queue>

/*********************************任务队列/任务***************************************/
using callback = void (*)(void *); // callback 类型别名

/*任务类*/
class Task{
public:
callback m_function;
void *m_arg;

Task():m_function(nullptr),m_arg(nullptr){}

Task(callback f,void *arg):m_function(f),m_arg(arg){}
};

/*任务队列类*/
class TaskQueue{
private:
pthread_mutex_t m_mutex; // 互斥锁
std::queue<Task> m_queue; // 任务队列
public:
TaskQueue(); // 构造函数
~TaskQueue(); // 析构函数

void addTask(Task &task); // 添加任务
void addTask(callback func,void *arg);

Task taskTask(); // 取出任务

// 获取当前队列中的任务数量
inline int taskNumber();
};

/*********************************任务队列/任务***************************************/


/*********************************线程池*********************************************/
class ThreadPool
{
public:
ThreadPool(int min,int max); // 线程池构造函数,创建线程池对象
~ThreadPool(); // 析构函数

void addTask(Task task); // 添加任务
int getBusyNumber(); // 获取忙线程数量
int getAliveNumber(); // 获取活着的线程数量

private:
static void *worker(void *arg); // 工作线程的任务函数
static void *manager(void *arg); // 管理线程的任务函数
void threadExit(); // 线程退出函数

private:
pthread_mutex_t m_lock; // 线程互斥锁
pthread_cond_t m_notEmpty; // 线程条件变量
pthread_t *m_threadIDS; // 指向工作线程ID数组
pthread_t m_manageID; // 管理者线程ID,只有一个
TaskQueue *m_taskQ; // 任务队列
int m_minNum; // 最小工作线程数量
int m_maxNum; // 最大工作线程数量
int m_busyNum; // 忙碌工作线程数量
int m_aliveNum; // 存活工作线程数量
int m_exitNum; // 退出工作线程数量
bool m_shutdowm = false; // 是否关闭线程池
};

/*********************************线程池*********************************************/


2.源文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
#include <iostream>
#include <cstring>
#include <string>
#include <chrono>

#include "pthreadpool.h"
/*********************************任务队列/任务***************************************/
/*获取当前队列中的任务数量*/
inline int TaskQueue::taskNumber() {
return m_queue.size();
}

/*任务队列构造函数*/
TaskQueue::TaskQueue() {
pthread_mutex_init(&m_mutex,NULL); // 初始化互斥锁
}

/*析构函数,进行资源销毁回收*/
TaskQueue::~TaskQueue() {
pthread_mutex_destroy(&m_mutex);
}

/*向任务队列中添加任务*/
void TaskQueue::addTask(Task &task) {
// 任务队列没有上限,因此不需要考虑队列满的情况
pthread_mutex_lock(&m_mutex);
m_queue.push(task); // 任务队列为线程池,共享资源,加锁
pthread_mutex_unlock(&m_mutex);
}

/*向任务队列中添加任务*/
void TaskQueue::addTask(callback func,void *arg) {
pthread_mutex_lock(&m_mutex);
Task task;
task.m_function = func;
task.m_arg = arg;
m_queue.push(task);
pthread_mutex_unlock(&m_mutex);
}

/*从任务队列取出任务*/
Task TaskQueue::taskTask() {
Task t;
pthread_mutex_lock(&m_mutex);
if(m_queue.size() > 0)
{
t = m_queue.front(); // 访问队列首部的元素
m_queue.pop(); // 移除队列首部的元素
}
pthread_mutex_unlock(&m_mutex);
return t;
}

/*********************************任务队列/任务***************************************/


/*********************************线程池*********************************************/
/*线程池构造函数,创建线程池对象*/
ThreadPool::ThreadPool(int min, int max) {
m_taskQ = new TaskQueue; // 任务队列,当前任务队列没有上限
do {
// 初始化线程池
m_minNum = min;
m_maxNum = max;
m_busyNum = 0;
m_aliveNum = min;

// 为工作线程的数组分配内存
m_threadIDS = new pthread_t[m_maxNum];
if(m_threadIDS == nullptr)
{
std::cerr << "malloc thread_t[] failed..." <<std::endl;
break;
}
// 初始化
memset(m_threadIDS,0, sizeof(pthread_t) * m_maxNum);
// 初始化互斥锁与条件变量
if(pthread_mutex_init(&m_lock,NULL) != 0 || pthread_cond_init(&m_notEmpty,NULL) != 0)
{
std::cerr << "init mutex or condition failed..." <<std::endl;
break;
}

// 创建工作子线程
for(int i=0;i<m_minNum;i++)
{
// this代表当前的线程池对象
pthread_create(&m_threadIDS[i],NULL,worker, this);
// std::cout << "Create Worker Thread ID:" << std::to_string(m_threadIDS[i]) << std::endl;
}
// 创建管理者线程
pthread_create(&m_manageID,NULL,manager,this); // this代表当前的线程池对象

}while(0);
}

/*线程池析构函数*/
ThreadPool::~ThreadPool() {
m_shutdowm = 1;

// 唤醒存活的工作线程
for(int i = 0;i<m_aliveNum;i++)
{
pthread_cond_signal(&m_notEmpty);
}

// 回收管理者线程资源
pthread_join(m_manageID,NULL);

// 释放堆内存
if(m_taskQ) delete m_taskQ;
if(m_threadIDS) delete []m_threadIDS;

// 销毁互斥锁
pthread_mutex_destroy(&m_lock);
pthread_cond_destroy(&m_notEmpty);
}

/*
* func:
* 线程池添加任务
* parameter:
* Task task 任务对象
* */
void ThreadPool::addTask(Task task) {
if(m_shutdowm)
{
return;
}
// 添加任务,不需要加锁,任务队列中有锁
m_taskQ->addTask(task);
// 唤醒工作线程,抢任务队列中的任务
pthread_cond_signal(&m_notEmpty);
}

/*获取存活的工作线程数量*/
int ThreadPool::getAliveNumber() {
int threadNum = 0;
pthread_mutex_lock(&m_lock); // 线程池中的互斥锁,加锁,线程同步
threadNum = m_aliveNum;
pthread_mutex_unlock(&m_lock);
return threadNum;
}

/*获取忙碌的工作线程数量*/
int ThreadPool::getBusyNumber() {
int busyNum = 0;
pthread_mutex_lock(&m_lock);
busyNum = m_busyNum;
pthread_mutex_unlock(&m_lock);
return busyNum;
}

/*
* func:
* 工作线程的任务函数
* parameter:
* void *arg 线程池对象地址
* */
void *ThreadPool::worker(void *arg) {
ThreadPool *pool = static_cast<ThreadPool *>(arg);
while(1)
{
// 访问任务队列的共享资源,加锁
pthread_mutex_lock(&pool->m_lock);
// 判断当前任务队列是否为空,若为空,工作线程阻塞
while(pool->m_taskQ->taskNumber() == 0 && !pool->m_shutdowm)
{
std::cout << "Worker Thread " << std::to_string(pthread_self()) << " waiting..." <<std::endl;
// 阻塞线程,若条件变量通知,会解锁,再加锁
pthread_cond_wait(&pool->m_notEmpty,&pool->m_lock);

// 解除阻塞之后,判断是否销毁线程
if(pool->m_exitNum > 0)
{
pool->m_exitNum--;
if(pool->m_aliveNum > pool->m_minNum)
{
pool->m_aliveNum--;
pthread_mutex_unlock(&pool->m_lock);
// 退出当前工作线程
pool->threadExit();
}
}
}

// 若线程池关闭
if(pool->m_shutdowm)
{
pthread_mutex_unlock(&pool->m_lock);
pool->threadExit();
}

// 从任务队列中取任务
Task task = pool->m_taskQ->taskTask();
// 忙碌的线程数量+1
pool->m_busyNum++;
// 解锁
pthread_mutex_unlock(&pool->m_lock);
// 执行任务
task.m_function(task.m_arg);
// 若传递的任务参数指针指向的是一块堆内存,则需要释放
delete task.m_arg;
task.m_arg = nullptr;

// 忙碌的工作线程数量-1
pthread_mutex_lock(&pool->m_lock);
// 任务执行结束
std::cout << "Worker Thread: " << std::to_string(pthread_self()) << " end working..." << std::endl;
pool->m_busyNum--;
pthread_mutex_unlock(&pool->m_lock);
}
return nullptr;
}

/*
* func:
* 管理者线程任务函数
* parameter:
* void *arg 线程池对象地址
* */
void *ThreadPool::manager(void *arg)
{
ThreadPool *pool = static_cast<ThreadPool *>(arg);

// 判断线程池是否关闭
while(!pool->m_shutdowm)
{
// 每3s检测一次
std::this_thread::sleep_for(std::chrono::seconds(3));

// 取出线程池中任务队列中的任务的数量和当前线程存活的数量
// 取出线程池中忙碌的工作线程数量
pthread_mutex_lock(&pool->m_lock);
int queueSize = pool->m_taskQ->taskNumber();
int liveNum = pool->m_aliveNum;
int busyNum = pool->m_busyNum;
pthread_mutex_unlock(&pool->m_lock);

// 添加工作子线程
const int NUMBER = 2;
// 当前任务队列中的任务数量 > 当前存活的工作线程数量 && 当前存活的工作线程数量 < 最大的线程数量
if(queueSize > liveNum && liveNum < pool->m_maxNum)
{
// 加锁
pthread_mutex_lock(&pool->m_lock);
int num = 0;
for(int i=0;i<pool->m_maxNum && num < NUMBER &&pool->m_aliveNum < pool->m_maxNum;i++)
{
// 工作线程ID数组位置没有被占用
if(pool->m_threadIDS[i] == 0)
{
// 创建工作的子线程
pthread_create(&pool->m_threadIDS[i],NULL,worker,pool);
num++;
pool->m_aliveNum++;
}
}
pthread_mutex_unlock(&pool->m_lock);
}

// 销毁多余的工作线程
// 忙线程*2 < 存活的线程数目 && 存活的线程数 > 最小线程数量
if(busyNum*2 < liveNum && liveNum > pool->m_minNum )
{
pthread_mutex_lock(&pool->m_lock);
pool->m_exitNum = NUMBER;
pthread_mutex_unlock(&pool->m_lock);
for(int i = 0;i<NUMBER;i++)
{
// 唤醒此时因为任务队列为空,阻塞的工作子线程
// 自杀
pthread_cond_signal(&pool->m_notEmpty);
}
}
}
// 管理者线程退出
std::cout << "Manager Thread Exiting ID: " << std::to_string(pthread_self()) <<std::endl;
pthread_exit(NULL);
}


/*线程退出函数*/
void ThreadPool::threadExit() {
pthread_t tid;
for(int i=0;i<m_maxNum;i++)
{
if(tid == m_threadIDS[i])
{
std::cout << "threadExit() called,Thread ID: " << std::to_string(pthread_self()) << " Exiting...." << std::endl;
m_threadIDS[i] = 0;
break;
}
}
pthread_exit(NULL);
}

/*********************************线程池*********************************************/


3.测试代码

main.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#include <iostream>
#include <chrono>
#include <string>
#include "pthreadpool.h"


// 任务函数
void taskFunc(void* arg)
{
int num = *((int*)arg);
std::cout << "Thread: " << std::to_string(pthread_self()) << " is working,num = " << num <<std::endl;
// 休眠1s
std::this_thread::sleep_for(std::chrono::seconds(1));
}

int main() {
// 创建线程池
ThreadPool pool(3,10);
// 添加100个任务向任务队列中
for(int i = 0;i<10;++i)
{
int *num = new int(i+1);
// 向线程池的任务队列中添加任务
pool.addTask(Task(taskFunc,num));
}

// 主线程休眠---保证工作线程进行工作
// 休眠1s
std::this_thread::sleep_for(std::chrono::seconds(12));

return 0;
}

CMakeLists.txt

1
2
3
4
5
6
7
8
9
10
cmake_minimum_required(VERSION 3.19)
project(ThreadPoolC__)

set(CMAKE_CXX_STANDARD 14)

find_package(Threads REQUIRED)

add_executable(ThreadPoolC__ main.cpp pthreadpool.cpp)

target_link_libraries(ThreadPoolC__ PRIVATE Threads::Threads)

编译运行,结果如下:

image-20240517195057774

这个程序中的输出操作可以注释掉,因为可能上一个输出代码还没有执行完毕,下一个线程就可以执行了,因此可能会造成终端输出比较混乱