线程池C语言版本

教程文档:爱编程的大丙

视频教程:爱编程的大丙

一、线程池原理

线程池使得线程可以得到复用,在执行完一个任务之后,不进行销毁而是继续执行其他任务。

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。

线程池的组成主要分为3个部分,这三部分配合工作就可以得到一个完整的线程池:

任务队列中的任务实际是回调函数,函数地址

  • 1.任务队列,存储需要处理的任务,由工作的线程来处理这些任务

    通过线程池提供的API函数,将一个待处理的任务添加到任务队列,或者从任务队列中删除

    已处理的任务会被从任务队列中删除

    线程池的使用者,也就是调用线程池函数往任务队列中添加任务的线程就是生产者线程

  • 2.工作的线程(任务队列任务的消费者) ,N

    线程池中维护了一定数量的工作线程, 他们的作用是是不停的读任务队列, 从里边取出任务并处理

    工作的线程相当于是任务队列的消费者角色

    如果任务队列为空, 工作的线程将会被阻塞 (使用条件变量/信号量阻塞)

    如果阻塞之后有了新的任务, 由生产者将阻塞解除, 工作线程开始工作

  • 3.管理者线程(不处理任务队列中的任务),1

    它的任务是周期性的对任务队列中的任务数量以及处于忙状态的工作线程个数进行检测

    ​ 当任务过多的时候, 可以适当的创建一些新的工作线程

    ​ 当任务过少的时候, 可以适当的销毁一些工作的线程

image-20231220201825184

二、任务队列

任务结构体,成员有任务函数以及函数需要的参数

任务函数由有一个函数指针function指向,并且指向的函数返回值为void *,函数参数也为void *

void *,任何类型的指针都可以直接赋值给它,无需进行强制类型转换

1
2
3
4
5
6
// 任务结构体
typedef struct Task
{
void (*function)(void* arg); // 函数指针
void* arg;
}Task;

三、线程池定义

线程池结构体包含几个部分:

  • 任务队列的定义

    任务队列容量、当前队列中任务的个数,任务队列的首尾指针(在队尾加入任务,在队首取出任务)

    当前队列中任务的个数,该变量若同时被多个线程操作则会导致数据混乱,因此需要加入互斥锁保证线程同步

  • 管理者线程IDpthread_t managerID

    管理者线程ID变量的类型为pthread_t,在实例代码中,只创建了一个管理者线程,在线程池中的功能下:

    (1)添加线程:当工作线程忙不来的时候,需要添加工作线程(但是每次添加的线程数需要对其进行相关约束,如:线程池中当前存活的线程数需要小于线程池中最大的线程数量)。

    (2)销毁线程:当前的任务队列中的任务数量太少,不需要过多的工作线程时(节省资源),需要对工作的子线程进行销毁操作,可以定义一个变量为每次销毁的子线程数(实例代码中为NUMBER,在threadpool.c中被初始化,并且使用extern外部引用至threadpool.h中)。而销毁的方式可以选择诱导子线程自杀(可以分析得知,当任务队列为空时,这时会存在许多空闲的子线程,在工作线程调用的worker函数中会通过pthread_cond_wait函数将这些空闲的子线程进行阻塞,在生产者线程继续向任务队列中添加任务后,任务队列不为空时,生产者线程会通过pthread_cond_signal函数以及对应的条件变量将阻塞的工作线程唤醒,继续接活干。其次在管理者线程每次休眠之后,进行检查时,发现还有很多空闲的工作线程并且空闲的数量满足一定数量上的判断时,则会考虑在管理者线程对应的manager函数中通过pthread_cond_signal函数以及对应的条件变量将阻塞的工作线程唤醒,并且设置相应的销毁工作线程标志位exitNum对其赋值为每次销毁的子线程数量。在工作线程对于的worker函数中解除工作线程的阻塞之后,则会对线程池的pool->exitNum进行判断,若该值为真则会让空闲的工作线程自杀)。

  • 工作线程IDpthread_t *threadIDs

    工作线程ID定义为pthread_t类型的一级指针,因为工作线程数量较多,在工作线程创建时,会使用malloc函数创建一段连续的堆空间,使用工作线程ID指针指向其地址空间。创建的工作线程ID一级指针会当作数组进行使用,用于存储创建的工作线程ID号(但是工作线程ID一级指针指向的堆空间在创建之初需要对其进行初始化均赋值为0,在之后的操作中对每段堆空间进行判断若其值不为0,则表示已经有工作线程对该段堆空间进行占用。在每次工作线程退出时,也需要该工作线程占用的堆空间进行赋值为0,表示该工作线程结束了对该段堆空间的占用,方便后续创建的工作线程对其进行使用)。

  • 最小工作线程数量

    线程池所容纳的最小工作线程数量(线程池还在工作时,存活的工作线程数量需要大于等于最小的工作线程数量)。

  • 最大工作线程数量

    线程池所容纳的最大工作线程数量(线程池还在工作时,存活的工作线程数量需要小于等于最大的工作线程数量)。

  • 忙的工作线程数量

    用于表示线程池中当前正在工作的工作线程数量,每当工作线程在任务队列中取出任务并且执行时,该变量进行+1操作。当任务执行完毕之后,则进行-1操作。该变量的操作非常频繁,并且需要注意的是,不能多个工作线程对该变量同时进行操作,在对该变量进行操作时,需要加入互斥锁(在线程池中对于这种会频繁操作的变量,特意定义了一把独有的互斥锁pool->mutexBusy)保证线程同步,避免数据混乱。

  • 存活的工作线程的数量

    用于表示线程池中当前存活的工作线程数量,当管理者线程需要对线程池中的线程进行添加或者销毁工作时,会对改变了进行操作,不能多个工作线程对该变量同时进行操作,在对该变量进行操作时,需要加入互斥锁(在实例代码中直接使用的是整个线程池的互斥锁)保证线程同步,避免数据混乱。

  • 要销毁的工作线程个数

    在管理者线程对应的manager函数中,首先,若需要管理者线程进行工作线程销毁时,则会对该变量进行操作,在操作该变量时,也需要添加互斥锁,保证线程同步(在该实例代码中虽然只有一个管理者线程,但是若存在多个管理者线程,则不能允许多个管理者线程对该变量同时进行操作,因此需要保证线程同步)。其次,因为本文的实例代码中,管理者线程销毁工作线程,是通过诱导工作线程自杀的方式,因此在工作线程对应的worker函数中也会对该变量进行操作,此时是当前自杀的工作子线程对该变量进行操作,也需要保证线程同步,添加互斥锁。

  • 线程池的互斥锁mutexPool

    锁整个线程池,对线程池中的任务队列以及可能造成数据混乱的其他变量进行同步操作

  • busyNum变量的互斥锁

    当前忙工作线程的数量,该变量变化次数多,因此单独为其创建一把互斥锁

  • 判断任务队列是否已满的条件变量notFull

    当生产者为任务队列添加任务导致队列满时,会使用pthread_cond_wait函数通过线程池的锁pool->mutexPool以及条件变量pool->notFull阻塞生产者线程。而在工作线程对应的worker函数中,当工作线程从任务队列中取出任务后,则会通过pthread_cond_signal(&pool->notFull)唤醒阻塞的生产者线程继续为任务队列添加任务

  • 销毁线程池标志位

    若标志位shutdown为1,则表示对线程池进行销毁,若标志位为0,则表示不销毁。

    销毁线程池,需要将管理者线程与工作线程均进行退出并且回收,因此在后文会创建线程退出函数threadExit(线程退出函数的逻辑后文会给出)

    销毁线程池函数threadPoolDestroy,当main.c中主线程以及线程池中的线程完成任务后,会调用销毁线程池函数,函数体内,会将标志位shutdown赋值为1,并且唤醒所有阻塞的工作线程,进一步对销毁线程池标志位进行判断,若为真则执行线程退出函数(因为此时所有任务均完成,任务队列中已经没有其他任务,工作线程会被工作函数worker中的pthread_cond_wait(&pool->notEmpty,&pool->mutexPool)阻塞)。而针对于管理者线程,在对应的管理者函数内,会对标志位shutdown进行判断,若该满足条件,管理者线程也会退出,并且在销毁线程池函数threadPoolDestroy体内会对管理者线程调用pthread_join(pool->managerID,NULL)函数进行管理者线程资源的阻塞回收。在完成上面的操作之后,则会将线程池创建的任务队列以及工作线程ID存储空间所开辟的堆空间进行释放,并且销毁所有的互斥锁以及条件变量。最后将线程池指针pool所指向的动态内存空间也进行释放。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 线程池结构体
struct Threadpool
{
//任务队列
Task *taskQ;
int queueCapacity; // 容量
int queueSize; // 当前任务个数
int queueFront; // 队头 -> 取数据
int queueRear; // 队尾 -> 放数据

pthread_t managerID; // 管理者线程ID
pthread_t *threadIDs; // 工作的线程ID--使用一级指针指向装有工作线程ID的数组
int minNum; // 最小工作线程数量
int maxNum; // 最大工作线程数量
int busyNum; // 忙的工作线程的个数
int liveNum; // 存活的工作线程的个数
int exitNum; // 要销毁的工作线程个数---适当的销毁没有活干的工作线程
pthread_mutex_t mutexPool; // 锁整个的线程池---对整个任务队列做线程同步,避免造成数据混乱
pthread_mutex_t mutexBusy; // 锁busyNum变量---该变量变化次数更多,也需要避免数据混乱
// 条件变量
pthread_cond_t notFull; // 任务队列是不是满了
pthread_cond_t notEmpty; // 任务队列是不是空了

int shutdown; // 是不是要销毁线程池, 销毁为1, 不销毁为0

};

四、头文件声明

threadpool.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
//
// Created by zxz on 2023/12/20.
//
#ifndef UNTITLED_THREADPOOL_H
#define UNTITLED_THREADPOOL_H
#include <pthread.h>

// 在threadpool.c源文件进行定义与初始化,在头文件进行extern
// 若在头文件中进行初始化,当头文件被多个文件进行引用的时候,就会导致比变量被重复定义
extern const int NUMBER;

// 任务结构体
typedef struct Task
{
void (*function)(void *arg); // 函数指针
void *arg;
}Task;

// 线程池结构体
struct Threadpool
{
//任务队列
Task *taskQ;
int queueCapacity; // 容量
int queueSize; // 当前任务个数
int queueFront; // 队头 -> 取数据
int queueRear; // 队尾 -> 放数据

pthread_t managerID; // 管理者线程ID
pthread_t *threadIDs; // 工作的线程ID--使用一级指针指向装有工作线程ID的数组
int minNum; // 最小线程数量
int maxNum; // 最大线程数量
int busyNum; // 忙的线程的个数
int liveNum; // 存活的线程的个数
int exitNum; // 要销毁的线程个数---适当的销毁没有活干的线程
pthread_mutex_t mutexPool; // 锁整个的线程池---对整个任务队列做线程同步,避免造成数据混乱
pthread_mutex_t mutexBusy; // 锁busyNum变量---该变量变化次数更多,也需要避免数据混乱
// 条件变量
pthread_cond_t notFull; // 任务队列是不是满了
pthread_cond_t notEmpty; // 任务队列是不是空了

int shutdown; // 是不是要销毁线程池, 销毁为1, 不销毁为0

};

typedef struct Threadpool Thread_Pool;

//创建线程池并初始化
Thread_Pool *threadPoolCreate(int min,int max,int queueSize);

// 销毁线程池
int threadPoolDestroy(Thread_Pool* pool);

// 给线程池添加任务
void threadPoolAdd(Thread_Pool* pool, void(*func)(void*), void* arg);

// 获取线程池中工作的线程个数
int threadPoolBusyNum(Thread_Pool* pool);

// 获取线程池中活着的线程的个数
int threadPoolAliveNum(Thread_Pool* pool);

// 工作的线程(消费者线程)任务函数
_Noreturn void* worker(void* arg);

// 管理者线程任务函数
void* manager(void* arg);

// 单个线程退出
void threadExit(Thread_Pool* pool);


#endif //UNTITLED_THREADPOOL_H

五、源文件定义

threadpool.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
//
// Created by zxz on 2023/12/20.
//
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include "threadpool.h"

// 管理者线程--每次增加或者销毁的线程数量
const int NUMBER = 2;


/*** 创建线程池 ****/
Thread_Pool *threadPoolCreate(int min,int max,int queueSize)
{
Thread_Pool *pool;
pool = malloc(sizeof(*pool));
do {
if (pool == NULL)
{
printf("malloc threadpool fail...\n");
break;
}

// 工作的线程---开辟连续的内存空间进行存储
pool->threadIDs = malloc(sizeof(pthread_t) * max);
if (pool->threadIDs == NULL)
{
printf("malloc threadIDS fail...\n");
break;
}
// 对工作的线程ID进行初始化为0,若之后判断仍然为0,则说明该线程ID没有被占用
memset(pool->threadIDs,0,sizeof(pthread_t)*max);
// 初始化相关变量
pool->minNum = min;
pool->maxNum = max;
pool->busyNum = 0;
pool->liveNum = min; // 和最小个数相等
pool->exitNum = 0;

// 初始化互斥锁与条件变量
if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
pthread_cond_init(&pool->notEmpty, NULL) != 0 ||
pthread_cond_init(&pool->notFull, NULL) != 0)
{
printf("mutex or condition init fail...\n");
break;
}

// 任务队列
pool->taskQ = malloc(sizeof(Task) * queueSize);
pool->queueCapacity = queueSize;
pool->queueSize = 0;
pool->queueFront = 0;
pool->queueRear = 0;

pool->shutdown = 0;

// 创建线程
pthread_create(&pool->managerID,NULL,manager,pool);
for(int i = 0;i<min;i++)
{
pthread_create(&pool->threadIDs[i],NULL,worker,pool);
}

return pool;
}while(0); // while(0)不会一直循环,但是在代码块中可以使用break

// 若跳出了while(0),则出现了异常
// 进行资源释放
if (pool && pool->threadIDs) free(pool->threadIDs);
if (pool && pool->taskQ) free(pool->taskQ);
if (pool) free(pool);

return NULL;
}


/*****工作的线程(消费者线程)任务函数*****/
// 在任务队列中取出任务并且进行消费
// 消费者消费产品之后需要唤醒阻塞的生产者
void* worker(void* arg)
{
Thread_Pool *pool = (Thread_Pool *)arg;

while(1)
{
pthread_mutex_lock(&pool->mutexPool);
// 当前任务队列为空
while(pool->queueSize == 0 && !pool->shutdown)
{
// 阻塞工作线程---此时任务等待生产者线程添加任务在继续
pthread_cond_wait(&pool->notEmpty,&pool->mutexPool);

// 解除阻塞(wait函数解除阻塞之后会先解锁后加锁),诱导工作线程自杀
if(pool->exitNum > 0)
{
pool->exitNum--;
if(pool->liveNum > pool->minNum)
{
// 活着的线程个数也需要-1 线程在wait函数中解除阻塞时,会自动加锁,因此此处不需要进行加锁操作
pool->liveNum--;
// 解锁
pthread_mutex_unlock(&pool->mutexPool);
// 终止调用线程---空闲的线程自己向下执行
threadExit(pool);
// pthread_exit(NULL);
}
}
}

// 判断线程池是否关闭
if(pool->shutdown > 0)
{
pthread_mutex_unlock(&pool->mutexPool);
// 线程退出
threadExit(pool);
//pthread_exit(NULL);
}

// 工作线程进行消费
Task task;
// 从头部取任务
task.function = pool->taskQ[pool->queueFront].function;
task.arg = pool->taskQ[pool->queueFront].arg;
// 移动队列头节点---对于循环队列
pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;
// 任务总个数-1
pool->queueSize--;
// 唤醒阻塞在 队列满的条件变量(pool->notFull) 上的线程,此时已经消费了一个任务,队列不满了
// 可以继续添加任务了
pthread_cond_signal(&pool->notFull);

pthread_mutex_unlock(&pool->mutexPool);

// 输出告知系统当前线程开始执行任务
printf("thread %ld start working... \n",pthread_self());
// 对于忙线程个数进行+1
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum++;
pthread_mutex_unlock(&pool->mutexBusy);

// 执行任务
task.function(task.arg);
// 若传递的参数是一块堆内存则需要对其进行释放
free(task.arg);
task.arg = NULL;
// 输出告知系统当前线程已经执行完毕任务
printf("thread %ld end working... \n",pthread_self());

// 任务执行完之后,忙线程需要-1
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum--;
pthread_mutex_unlock(&pool->mutexBusy);
}
return NULL;
}


/***** 管理者线程任务函数 *****/
// 用于创建线程与销毁多余空闲的线程
void* manager(void* arg)
{
Thread_Pool * pool = (Thread_Pool*)arg;
while(!pool->shutdown)
{
// 每隔3s检测一次
sleep(3);

// 取出线程池中任务的数量和当前线程存活的数量
// 需要加锁,避免数据混乱
pthread_mutex_lock(&pool->mutexPool);
int queueSize = pool->queueSize;
int liveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);

// 取出忙的线程数量
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);

/***** 添加线程 ******/
// 当工作线程忙不过来的时,需要继续添加线程
// 任务的个数>线程存活的个数 && 存活的线程数 < 最大线程数
if(queueSize > liveNum && liveNum < pool->maxNum)
{
// 加锁 因为下面也操作了线程池的变量
pthread_mutex_lock(&pool->mutexPool);
int counter = 0;
for(int i = 0;i<pool->maxNum && counter < NUMBER && pool->liveNum < pool->maxNum;i++)
{
// 该线程ID没有被使用
if(pool->threadIDs[i] == 0)
{
// 创建新的线程
pthread_create(&pool->threadIDs[i],NULL,worker,pool);
counter++;
pool->liveNum++;
}
}
// 解锁
pthread_mutex_unlock(&pool->mutexPool);
}


/***** 销毁线程 ******/
// 忙的线程*2 < 存活的线程数 && 存活的线程>最小线程数
if(busyNum *2 < liveNum && liveNum > pool->liveNum)
{
// 操作线程池中的变量需要进行加锁
pthread_mutex_lock(&pool->mutexPool);
pool->exitNum = NUMBER;
pthread_mutex_unlock(&pool->mutexPool);

// 让空闲的工作线程自杀
for(int i=0;i<NUMBER;i++)
{
// 若此时多余的线程是空闲的则说明此时的任务队列为空
// 多余的线程此时阻塞在worker函数的pthread_cond_wait(&pool->notEmpty,&pool->mutexPool)位置
// 只需将其唤醒,使其在worker函数中向后执行并且自杀
pthread_cond_signal(&pool->notEmpty);
}
}

}
printf("Manager Thread,%ld exiting...............\n",pthread_self());
// 管理者线程退出
pthread_exit(NULL);
}


/**** 线程退出函数 ****/
// 存储退出线程id的数组位置需要进行清0
void threadExit(Thread_Pool* pool)
{
pthread_t tid = pthread_self();
for(int i = 0;i<pool->maxNum;i++)
{
// 找到当前调用该函数的线程id的数组位置,对其进行归零
if(pool->threadIDs[i] == tid)
{
pool->threadIDs[i] = 0;
printf("threadExit() called,%ld exiting...............\n",tid);
break;
}
}
// 当前线程退出
pthread_exit(NULL);
// 退出的工作线程资源回收
pthread_join(tid,NULL);
}

/***** 线程池添加任务 ****/
// 生产者生产产品之后需要唤醒阻塞的消费者
void threadPoolAdd(Thread_Pool* pool, void(*func)(void*), void* arg)
{
// 也需要锁住--避免有线程同时在任务队列中添加任务又在任务队列中取任务
pthread_mutex_lock(&pool->mutexPool);
// 若任务队列满
while(pool->queueSize == pool->queueCapacity && !pool->shutdown)
{
// 阻塞生产者线程
pthread_cond_wait(&pool->notFull,&pool->mutexPool);
}
// 线程池关闭
if(pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPool);
return;
}
// 线程池没有关闭 则继续添加任务 添加到队尾
pool->taskQ[pool->queueRear].function = func;
pool->taskQ[pool->queueRear].arg = arg;
// 队尾进行后移
pool->queueRear =(pool->queueRear + 1) % pool->queueCapacity;
// 任务队列中的任务个数+1
pool->queueSize++;

// 唤醒阻塞在 队列为空pool->notEmpty 条件变量上的线程 有活干了
pthread_cond_signal(&pool->notEmpty);

pthread_mutex_unlock(&pool->mutexPool);
}

/**** 获取线程池中工作的线程个数 ****/
int threadPoolBusyNum(Thread_Pool* pool)
{
pthread_mutex_lock(&pool->mutexPool);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexPool);
return busyNum;
}

/****** 获取线程池中活着的线程的个数 ******/
int threadPoolAliveNum(Thread_Pool* pool)
{
pthread_mutex_lock(&pool->mutexPool);
int liveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);
return liveNum;
}


/****** 销毁线程池 *******/
int threadPoolDestroy(Thread_Pool* pool)
{
// pool线程池指向的是一块非有效的地址
if(pool == NULL)
{
return -1;
}

// 关闭线程池
pool->shutdown = 1;
// 唤醒存活的消费者线程
for(int i = 0;i<pool->liveNum;i++)
{
pthread_cond_signal(&pool->notEmpty);
}

// 阻塞回收管理者线程---等工作线程退出之后在对管理者线程进行资源回收
pthread_join(pool->managerID,NULL);

// 释放堆内存
if(pool->taskQ)
{
free(pool->taskQ);
}
if(pool->threadIDs)
{
free(pool->threadIDs);
}

// 销毁条件变量与互斥锁
pthread_mutex_destroy(&pool->mutexPool);
pthread_mutex_destroy(&pool->mutexBusy);
pthread_cond_destroy(&pool->notEmpty);
pthread_cond_destroy(&pool->notFull);

free(pool);
pool = NULL;
return 0;
}

1.线程池创建函数

  • 参数

    线程池工作时最少工作线程数量min

    线程池工作时最少多工作线程数量max

    任务队列的容量queueSize,于后文提到的任务队列中当前任务数pool->queueSize不一样

  • 返回值

    struct Threadpool类型的指针,指向线程池

  • 业务逻辑

    将所有创建过程均可以放置在一个do whle(0)的循环中,该循环只进行一次,使用这种语法的好处时,若创建步骤失败,则可以使用break 中断创建,并且在函数最后对之前创建成功所占用的内存空间进行释放。

    (1)工作线程ID的存储:开辟连续内存的堆空间,对工作线程的ID进行存储,其空间大小为sizeof(pthread_t) * max,创建之后对每一个线程ID存储位置赋值为0

    (2)初始化互斥锁与条件变量:若每一次初始化函数返回的值不为0,则使用break中断

    (3)任务队列的创建:开辟动态内存创建queueSize数量的任务存储空间,并且对任务队列相关的变量进行初始化

    (4)管理者线程与工作线程的创建:在实例代码中仅创建一个管理者线程,并且工作线程在开始时,仅创建最少的工作线程数量。

    (5)创建失败则回收资源:上述步骤中,若创建失败,则回收资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
Thread_Pool *threadPoolCreate(int min,int max,int queueSize)
{
Thread_Pool *pool;
pool = malloc(sizeof(*pool));
do {
if (pool == NULL)
{
printf("malloc threadpool fail...\n");
break;
}

// 工作的线程---开辟连续的内存空间进行存储ID
pool->threadIDs = malloc(sizeof(pthread_t) * max);
if (pool->threadIDs == NULL)
{
printf("malloc threadIDS fail...\n");
break;
}
// 对工作的线程ID进行初始化为0,若之后判断仍然为0,则说明该线程ID没有被占用
memset(pool->threadIDs,0,sizeof(pthread_t)*max);
// 初始化相关变量
pool->minNum = min;
pool->maxNum = max;
pool->busyNum = 0;
pool->liveNum = min; // 和最小个数相等
pool->exitNum = 0;

// 初始化互斥锁与条件变量
if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
pthread_cond_init(&pool->notEmpty, NULL) != 0 ||
pthread_cond_init(&pool->notFull, NULL) != 0)
{
printf("mutex or condition init fail...\n");
break;
}

// 任务队列
pool->taskQ = malloc(sizeof(Task) * queueSize);
pool->queueCapacity = queueSize;
pool->queueSize = 0;
pool->queueFront = 0;
pool->queueRear = 0;

pool->shutdown = 0;

// 创建线程
pthread_create(&pool->managerID,NULL,manager,pool);
for(int i = 0;i<min;i++)
{
pthread_create(&pool->threadIDs[i],NULL,worker,pool);
}

return pool;
}while(0); // while(0)不会一直循环,但是在代码块中可以使用break

// 若跳出了while(0),则出现了异常
// 进行资源释放
if (pool && pool->threadIDs) free(pool->threadIDs);
if (pool && pool->taskQ) free(pool->taskQ);
if (pool) free(pool);

return NULL;
}

2.工作线程的任务函数

worker函数

  • 参数:

    void *类型,在本实例代码中,传入的参数为指向线程池的指针,Thread_Pool *pool

  • 返回值:

    void *类型

  • 业务逻辑

    任务函数,需要一直运行,因此在while(1)的循环中进行操作

    每一个工作线程均会执行一个任务函数,为了保持线程同步,因此在执行任务函数时,需要将线程池进行加锁操作。

    (1)判断任务队列是否为空以及当前是否执行线程销毁:若任务队列为空,则需要阻塞工作线程,等待生产者线程为任务队列中添加任务之后在继续操作。

    (2)工作线程诱导自杀:工作线程解除阻塞之后,若有空闲的工作线程存在,则需要对要销毁的工作线程个数进行判断,若需要销毁的工作线程个数pool->exitNum大于0,则先对调用该函数的工作线程执行解锁操作(pthread_cond_wait解除阻塞之后,得到CPU时间片的工作线程会先解锁后加锁),最后经过相关的条件判断执行该工作线程的自杀操作,该工作调用线程主动调用线程退出函数

    (3)判断线程池是否关闭销毁:工作线程解除阻塞之后,需要对销毁线程池标志位进行判断,若为真,则先对该工作线程进行解锁操作,在执行线程退出函数

    (4)工作线程正常执行任务:若(2)(3)的判断均为否,则从任务队列的头部取出任务,在执行。期间需要移动队列头节点,指向下一个任务。并且对任务队列中当前任务总数queueSize进行-1。若此前任务队列为满时,则唤醒阻塞的生产者线程,继续为任务队列中添加任务。

    image-20231225144527629

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
/*****工作的线程(消费者线程)任务函数*****/
// 在任务队列中取出任务并且进行消费
// 消费者消费产品之后需要唤醒阻塞的生产者
void* worker(void* arg)
{
Thread_Pool *pool = (Thread_Pool *)arg;

while(1)
{
pthread_mutex_lock(&pool->mutexPool);
// 当前任务队列为空
while(pool->queueSize == 0 && !pool->shutdown)
{
// 阻塞工作线程---此时工作线程等待生产者线程添加任务在继续
pthread_cond_wait(&pool->notEmpty,&pool->mutexPool);

// 解除阻塞(wait函数解除阻塞之后会先解锁后加锁),诱导工作线程自杀
if(pool->exitNum > 0)
{
pool->exitNum--;
if(pool->liveNum > pool->minNum)
{
// 活着的线程个数也需要-1 线程在wait函数中解除阻塞时,会自动加锁,因此此处不需要进行加锁操作
pool->liveNum--;
// 解锁
pthread_mutex_unlock(&pool->mutexPool);
// 终止调用线程---空闲的线程自己向下执行
threadExit(pool);
// pthread_exit(NULL);
}
}
}

// 判断线程池是否关闭(销毁)
if(pool->shutdown > 0)
{
pthread_mutex_unlock(&pool->mutexPool);
// 线程退出
threadExit(pool);
//pthread_exit(NULL);
}

// 工作线程进行消费
Task task;
// 从头部取任务
task.function = pool->taskQ[pool->queueFront].function;
task.arg = pool->taskQ[pool->queueFront].arg;
// 移动队列头节点---对于循环队列
pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;
// 任务总个数-1
pool->queueSize--;
// 唤醒阻塞在 队列满的条件变量(pool->notFull) 上的线程,此时已经消费了一个任务,队列不满了
// 可以继续添加任务了
pthread_cond_signal(&pool->notFull);

pthread_mutex_unlock(&pool->mutexPool);

// 输出告知系统当前线程开始执行任务
printf("thread %ld start working... \n",pthread_self());
// 对于忙线程个数进行+1
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum++;
pthread_mutex_unlock(&pool->mutexBusy);

// 执行任务
task.function(task.arg);
// 若传递的参数是一块堆内存则需要对其进行释放
free(task.arg);
task.arg = NULL;
// 输出告知系统当前线程已经执行完毕任务
printf("thread %ld end working... \n",pthread_self());

// 任务执行完之后,忙线程需要-1
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum--;
pthread_mutex_unlock(&pool->mutexBusy);
}
return NULL;
}

3.管理者线程任务函数

manager函数

  • 参数:

    void *类型,在本实例代码中,传入的参数为指向线程池的指针,Thread_Pool *pool

  • 返回值:

    void *类型

  • 业务逻辑:

    while循环中判断线程销毁的标志位是否为真

    每隔3s检测一次

    (1)添加工作线程:当工作线程忙不过来时,则需要继续添加工作线程,减轻系统任务压力

    (2)销毁工作线程:当任务数量比较少,此时有较多空闲的工作线程,并且其数量满足一定条件关系时,则需要进行工作线程的销毁工作,在实例代码中进行工作线程的销毁思路比较巧妙。通过唤醒阻塞在worker函数中等待任务队列中继续添加任务的工作线程诱导其自杀。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
/***** 管理者线程任务函数 *****/
// 用于创建线程与销毁多余空闲的线程
void* manager(void* arg)
{
Thread_Pool * pool = (Thread_Pool*)arg;
while(!pool->shutdown)
{
// 每隔3s检测一次
sleep(3);

// 取出线程池中任务的数量和当前线程存活的数量
// 需要加锁,避免数据混乱
pthread_mutex_lock(&pool->mutexPool);
int queueSize = pool->queueSize;
int liveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);

// 取出忙的线程数量
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);

/***** 添加线程 ******/
// 当工作线程忙不过来的时,需要继续添加线程
// 任务的个数>线程存活的个数 && 存活的线程数 < 最大线程数
if(queueSize > liveNum && liveNum < pool->maxNum)
{
// 加锁 因为下面也操作了线程池的变量
pthread_mutex_lock(&pool->mutexPool);
int counter = 0;
for(int i = 0;i<pool->maxNum && counter < NUMBER && pool->liveNum < pool->maxNum;i++)
{
// 该线程ID没有被使用
if(pool->threadIDs[i] == 0)
{
// 创建新的线程
pthread_create(&pool->threadIDs[i],NULL,worker,pool);
counter++;
pool->liveNum++;
}
}
// 解锁
pthread_mutex_unlock(&pool->mutexPool);
}


/***** 销毁线程 ******/
// 忙的线程*2 < 存活的线程数 && 存活的线程>最小线程数
if(busyNum *2 < liveNum && liveNum > pool->liveNum)
{
// 操作线程池中的变量需要进行加锁
pthread_mutex_lock(&pool->mutexPool);
pool->exitNum = NUMBER;
pthread_mutex_unlock(&pool->mutexPool);

// 让空闲的工作线程自杀
for(int i=0;i<NUMBER;i++)
{
// 若此时多余的线程是空闲的则说明此时的任务队列为空
// 多余的线程此时阻塞在worker函数的pthread_cond_wait(&pool->notEmpty,&pool->mutexPool)位置
// 只需将其唤醒,使其在worker函数中向后执行并且自杀
pthread_cond_signal(&pool->notEmpty);
}
}

}
printf("Manager Thread,%ld exiting...............\n",pthread_self());
// 管理者线程退出
pthread_exit(NULL);
}

4.线程退出函数

threadExit函数

  • 参数:

    void *类型,在本实例代码中,传入的参数为指向线程池的指针,Thread_Pool *pool

  • 返回值:

    void 类型

  • 业务逻辑:

    该线程退出函数针对的是,线程池中工作的子线程

    在执行调用该函数的工作线程退出操作时,需要先对存储该工作线程ID的数组位置清除占用(重新赋值为0)

    之后执行pthread库中的pthread_exit(NULL)退出该线程,并且对该线程进行资源回收,调用pthread_join(tid,NULL)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**** 线程退出函数 ****/
// 存储退出线程id的数组位置需要进行清0
void threadExit(Thread_Pool* pool)
{
pthread_t tid = pthread_self();
for(int i = 0;i<pool->maxNum;i++)
{
// 找到当前调用该函数的线程id的数组位置,对其进行归零
if(pool->threadIDs[i] == tid)
{
pool->threadIDs[i] = 0;
printf("threadExit() called,%ld exiting...............\n",tid);
break;
}
}
// 当前线程退出
pthread_exit(NULL);
// 退出的工作线程资源回收
pthread_join(tid,NULL);
}

5.线程池添加任务函数

threadPoolAdd函数

  • 参数:

    void *类型,在本实例代码中,传入的参数为指向线程池的指针,Thread_Pool *pool

    任务函数的地址,在形参中使用void(*func)(void*)类型的函数指针去指向

    任务函数的参数void* arg类型

  • 返回值:

    void 类型

  • 业务逻辑:

    为线程池的任务队列中添加任务

    在执行添加任务期间,需要添加互斥锁,不能多个线程同时添加任务又取出任务

    (1)任务队列已满:需要对生产者线程进行阻塞,当消费者(工作线程)取出任务后可对生产者线程进行唤醒

    (2)线程池销毁标志位为真:解开互斥锁,返回

    (3)添加任务操作:循环队列队尾进行入队操作,添加完任务之后可以唤醒阻塞的工作线程继续工作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/***** 线程池添加任务 ****/
// 生产者生产产品之后需要唤醒阻塞的消费者
void threadPoolAdd(Thread_Pool* pool, void(*func)(void*), void* arg)
{
// 也需要锁住--避免有线程同时在任务队列中添加任务又在任务队列中取任务
pthread_mutex_lock(&pool->mutexPool);
// 若任务队列满
while(pool->queueSize == pool->queueCapacity && !pool->shutdown)
{
// 阻塞生产者线程
pthread_cond_wait(&pool->notFull,&pool->mutexPool);
}
// 线程池关闭
if(pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPool);
return;
}
// 线程池没有关闭 则继续添加任务 添加到队尾
pool->taskQ[pool->queueRear].function = func;
pool->taskQ[pool->queueRear].arg = arg;
// 队尾进行后移
pool->queueRear =(pool->queueRear + 1) % pool->queueCapacity;
// 任务队列中的任务个数+1
pool->queueSize++;

// 唤醒阻塞在 队列为空pool->notEmpty 条件变量上的线程 有活干了
pthread_cond_signal(&pool->notEmpty);

pthread_mutex_unlock(&pool->mutexPool);
}

6.销毁线程池函数

threadPoolDestroy函数

  • 参数:

    void *类型,在本实例代码中,传入的参数为指向线程池的指针,Thread_Pool *pool

  • 返回值:

    void 类型

  • 业务逻辑:

    设置销毁线程池标志位pool->shutdown为真,赋值为1

    (1)循环唤醒存活的消费者(工作线程):在worker函数中pool->shutdown > 0判断为真,则会执行代码块中的threadExit(pool)线程退出函数

    (2)阻塞回收管理者线程:在manager函数中pool->shutdown > 0判断为真,则会跳出循环执行pthread库中的线程退出函数pthread_exit(NULL),在threadPoolDestroy函数中使用pthread_join(pool->managerID,NULL)函数对管理者线程进行阻塞回收

    (3)释放内存:释放任务队列以及存储工作线程ID以及线程池的堆内存。

    (4)销毁变量:销毁条件变量以及互斥锁变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
int threadPoolDestroy(Thread_Pool* pool)
{
// pool线程池指向的是一块非有效的地址
if(pool == NULL)
{
return -1;
}

// 关闭线程池
pool->shutdown = 1;
// 唤醒存活的消费者线程
for(int i = 0;i<pool->liveNum;i++)
{
pthread_cond_signal(&pool->notEmpty);
}

// 阻塞回收管理者线程---等工作线程退出之后在对管理者线程进行资源回收
pthread_join(pool->managerID,NULL);

// 释放堆内存
if(pool->taskQ)
{
free(pool->taskQ);
}
if(pool->threadIDs)
{
free(pool->threadIDs);
}

// 销毁条件变量与互斥锁
pthread_mutex_destroy(&pool->mutexPool);
pthread_mutex_destroy(&pool->mutexBusy);
pthread_cond_destroy(&pool->notEmpty);
pthread_cond_destroy(&pool->notFull);

free(pool);
pool = NULL;
return 0;
}

六、测试代码

main.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <stdio.h>
#include "threadpool.h"
#include <unistd.h>
#include <malloc.h>

// 任务函数
void taskFunc(void* arg)
{
int num = *((int*)arg);
printf("thread %ld is working, number = %d\n",pthread_self(), num);
sleep(1);
}


int main() {
// 创建线程池 最少3个线程 最多10个线程 任务队列任务数容量100
Thread_Pool *pool = threadPoolCreate(1,10,100);

// 添加100个任务向任务队列中

for(int i = 0;i<100;++i)
{
int *num = (int *)malloc(sizeof(int));
*num = i + 1;
// 向线程池的任务队列中添加任务
threadPoolAdd(pool,taskFunc,num);
}

// 主线程休眠---保证工作线程进行工作
sleep(30);

// 销毁线程池
threadPoolDestroy(pool);
return 0;
}

CMakeLists.txt

1
2
3
4
5
6
7
8
9
10
11
12
cmake_minimum_required(VERSION 3.20)
project(untitled C)

set(CMAKE_C_STANDARD 99)

# 寻找线程库
find_package(Threads REQUIRED)

add_executable(untitled main.c threadpool.c threadpool.h)

# 链接线程库
target_link_libraries(untitled ${CMAKE_THREAD_LIBS_INIT})

七、运行结果

工作的10个消费者线程以及管理者线程均成功退出

image-20231225150734202