线程池C语言版本
教程文档:爱编程的大丙
视频教程:爱编程的大丙
一、线程池原理
线程池使得线程可以得到复用,在执行完一个任务之后,不进行销毁而是继续执行其他任务。
线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。
线程池的组成主要分为3个部分,这三部分配合工作就可以得到一个完整的线程池:
任务队列中的任务实际是回调函数,函数地址
1.任务队列,存储需要处理的任务,由工作的线程来处理这些任务
通过线程池提供的API
函数,将一个待处理的任务添加到任务队列,或者从任务队列中删除
已处理的任务会被从任务队列中删除
线程池的使用者,也就是调用线程池函数往任务队列中添加任务的线程就是生产者线程
2.工作的线程(任务队列任务的消费者) ,N
个
线程池中维护了一定数量的工作线程, 他们的作用是是不停的读任务队列, 从里边取出任务并处理
工作的线程相当于是任务队列的消费者角色
如果任务队列为空, 工作的线程将会被阻塞 (使用条件变量/信号量阻塞)
如果阻塞之后有了新的任务, 由生产者将阻塞解除, 工作线程开始工作
3.管理者线程(不处理任务队列中的任务),1
个
它的任务是周期性的对任务队列中的任务数量以及处于忙状态的工作线程个数进行检测
当任务过多的时候, 可以适当的创建一些新的工作线程
当任务过少的时候, 可以适当的销毁一些工作的线程

二、任务队列
任务结构体,成员有任务函数以及函数需要的参数
任务函数由有一个函数指针function
指向,并且指向的函数返回值为void *
,函数参数也为void *
void *
,任何类型的指针都可以直接赋值给它,无需进行强制类型转换
1 2 3 4 5 6
| typedef struct Task { void (*function)(void* arg); void* arg; }Task;
|
三、线程池定义
线程池结构体包含几个部分:
任务队列的定义
任务队列容量、当前队列中任务的个数,任务队列的首尾指针(在队尾加入任务,在队首取出任务)
当前队列中任务的个数,该变量若同时被多个线程操作则会导致数据混乱,因此需要加入互斥锁保证线程同步
管理者线程IDpthread_t managerID
管理者线程ID变量的类型为pthread_t
,在实例代码中,只创建了一个管理者线程,在线程池中的功能下:
(1)添加线程:当工作线程忙不来的时候,需要添加工作线程(但是每次添加的线程数需要对其进行相关约束,如:线程池中当前存活的线程数需要小于线程池中最大的线程数量)。
(2)销毁线程:当前的任务队列中的任务数量太少,不需要过多的工作线程时(节省资源),需要对工作的子线程进行销毁操作,可以定义一个变量为每次销毁的子线程数(实例代码中为NUMBER
,在threadpool.c
中被初始化,并且使用extern
外部引用至threadpool.h
中)。而销毁的方式可以选择诱导子线程自杀(可以分析得知,当任务队列为空时,这时会存在许多空闲的子线程,在工作线程调用的worker
函数中会通过pthread_cond_wait
函数将这些空闲的子线程进行阻塞,在生产者线程继续向任务队列中添加任务后,任务队列不为空时,生产者线程会通过pthread_cond_signal
函数以及对应的条件变量将阻塞的工作线程唤醒,继续接活干。其次在管理者线程每次休眠之后,进行检查时,发现还有很多空闲的工作线程并且空闲的数量满足一定数量上的判断时,则会考虑在管理者线程对应的manager
函数中通过pthread_cond_signal
函数以及对应的条件变量将阻塞的工作线程唤醒,并且设置相应的销毁工作线程标志位exitNum
对其赋值为每次销毁的子线程数量。在工作线程对于的worker
函数中解除工作线程的阻塞之后,则会对线程池的pool->exitNum
进行判断,若该值为真则会让空闲的工作线程自杀)。
工作线程IDpthread_t *threadIDs
工作线程ID定义为pthread_t
类型的一级指针,因为工作线程数量较多,在工作线程创建时,会使用malloc
函数创建一段连续的堆空间,使用工作线程ID指针指向其地址空间。创建的工作线程ID一级指针会当作数组进行使用,用于存储创建的工作线程ID号(但是工作线程ID一级指针指向的堆空间在创建之初需要对其进行初始化均赋值为0,在之后的操作中对每段堆空间进行判断若其值不为0,则表示已经有工作线程对该段堆空间进行占用。在每次工作线程退出时,也需要该工作线程占用的堆空间进行赋值为0,表示该工作线程结束了对该段堆空间的占用,方便后续创建的工作线程对其进行使用)。
最小工作线程数量
线程池所容纳的最小工作线程数量(线程池还在工作时,存活的工作线程数量需要大于等于最小的工作线程数量)。
最大工作线程数量
线程池所容纳的最大工作线程数量(线程池还在工作时,存活的工作线程数量需要小于等于最大的工作线程数量)。
忙的工作线程数量
用于表示线程池中当前正在工作的工作线程数量,每当工作线程在任务队列中取出任务并且执行时,该变量进行+1
操作。当任务执行完毕之后,则进行-1
操作。该变量的操作非常频繁,并且需要注意的是,不能多个工作线程对该变量同时进行操作,在对该变量进行操作时,需要加入互斥锁(在线程池中对于这种会频繁操作的变量,特意定义了一把独有的互斥锁pool->mutexBusy
)保证线程同步,避免数据混乱。
存活的工作线程的数量
用于表示线程池中当前存活的工作线程数量,当管理者线程需要对线程池中的线程进行添加或者销毁工作时,会对改变了进行操作,不能多个工作线程对该变量同时进行操作,在对该变量进行操作时,需要加入互斥锁(在实例代码中直接使用的是整个线程池的互斥锁)保证线程同步,避免数据混乱。
要销毁的工作线程个数
在管理者线程对应的manager函数中,首先,若需要管理者线程进行工作线程销毁时,则会对该变量进行操作,在操作该变量时,也需要添加互斥锁,保证线程同步(在该实例代码中虽然只有一个管理者线程,但是若存在多个管理者线程,则不能允许多个管理者线程对该变量同时进行操作,因此需要保证线程同步)。其次,因为本文的实例代码中,管理者线程销毁工作线程,是通过诱导工作线程自杀的方式,因此在工作线程对应的worker函数中也会对该变量进行操作,此时是当前自杀的工作子线程对该变量进行操作,也需要保证线程同步,添加互斥锁。
线程池的互斥锁mutexPool
锁整个线程池,对线程池中的任务队列以及可能造成数据混乱的其他变量进行同步操作
busyNum
变量的互斥锁
当前忙工作线程的数量,该变量变化次数多,因此单独为其创建一把互斥锁
判断任务队列是否已满的条件变量notFull
当生产者为任务队列添加任务导致队列满时,会使用pthread_cond_wait
函数通过线程池的锁pool->mutexPool
以及条件变量pool->notFull
阻塞生产者线程。而在工作线程对应的worker
函数中,当工作线程从任务队列中取出任务后,则会通过pthread_cond_signal(&pool->notFull)
唤醒阻塞的生产者线程继续为任务队列添加任务
销毁线程池标志位
若标志位shutdown
为1,则表示对线程池进行销毁,若标志位为0,则表示不销毁。
销毁线程池,需要将管理者线程与工作线程均进行退出并且回收,因此在后文会创建线程退出函数threadExit
(线程退出函数的逻辑后文会给出)
销毁线程池函数threadPoolDestroy
,当main.c
中主线程以及线程池中的线程完成任务后,会调用销毁线程池函数,函数体内,会将标志位shutdown
赋值为1
,并且唤醒所有阻塞的工作线程,进一步对销毁线程池标志位进行判断,若为真则执行线程退出函数(因为此时所有任务均完成,任务队列中已经没有其他任务,工作线程会被工作函数worker
中的pthread_cond_wait(&pool->notEmpty,&pool->mutexPool)
阻塞)。而针对于管理者线程,在对应的管理者函数内,会对标志位shutdown
进行判断,若该满足条件,管理者线程也会退出,并且在销毁线程池函数threadPoolDestroy
体内会对管理者线程调用pthread_join(pool->managerID,NULL)
函数进行管理者线程资源的阻塞回收。在完成上面的操作之后,则会将线程池创建的任务队列以及工作线程ID存储空间所开辟的堆空间进行释放,并且销毁所有的互斥锁以及条件变量。最后将线程池指针pool
所指向的动态内存空间也进行释放。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| struct Threadpool { Task *taskQ; int queueCapacity; int queueSize; int queueFront; int queueRear;
pthread_t managerID; pthread_t *threadIDs; int minNum; int maxNum; int busyNum; int liveNum; int exitNum; pthread_mutex_t mutexPool; pthread_mutex_t mutexBusy; pthread_cond_t notFull; pthread_cond_t notEmpty;
int shutdown;
};
|
四、头文件声明
threadpool.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
|
#ifndef UNTITLED_THREADPOOL_H #define UNTITLED_THREADPOOL_H #include <pthread.h>
extern const int NUMBER;
typedef struct Task { void (*function)(void *arg); void *arg; }Task;
struct Threadpool { Task *taskQ; int queueCapacity; int queueSize; int queueFront; int queueRear;
pthread_t managerID; pthread_t *threadIDs; int minNum; int maxNum; int busyNum; int liveNum; int exitNum; pthread_mutex_t mutexPool; pthread_mutex_t mutexBusy; pthread_cond_t notFull; pthread_cond_t notEmpty;
int shutdown;
};
typedef struct Threadpool Thread_Pool;
Thread_Pool *threadPoolCreate(int min,int max,int queueSize);
int threadPoolDestroy(Thread_Pool* pool);
void threadPoolAdd(Thread_Pool* pool, void(*func)(void*), void* arg);
int threadPoolBusyNum(Thread_Pool* pool);
int threadPoolAliveNum(Thread_Pool* pool);
_Noreturn void* worker(void* arg);
void* manager(void* arg);
void threadExit(Thread_Pool* pool);
#endif
|
五、源文件定义
threadpool.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345
|
#include <stdlib.h> #include <stdio.h> #include <string.h> #include <unistd.h> #include "threadpool.h"
const int NUMBER = 2;
Thread_Pool *threadPoolCreate(int min,int max,int queueSize) { Thread_Pool *pool; pool = malloc(sizeof(*pool)); do { if (pool == NULL) { printf("malloc threadpool fail...\n"); break; }
pool->threadIDs = malloc(sizeof(pthread_t) * max); if (pool->threadIDs == NULL) { printf("malloc threadIDS fail...\n"); break; } memset(pool->threadIDs,0,sizeof(pthread_t)*max); pool->minNum = min; pool->maxNum = max; pool->busyNum = 0; pool->liveNum = min; pool->exitNum = 0;
if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 || pthread_mutex_init(&pool->mutexBusy, NULL) != 0 || pthread_cond_init(&pool->notEmpty, NULL) != 0 || pthread_cond_init(&pool->notFull, NULL) != 0) { printf("mutex or condition init fail...\n"); break; }
pool->taskQ = malloc(sizeof(Task) * queueSize); pool->queueCapacity = queueSize; pool->queueSize = 0; pool->queueFront = 0; pool->queueRear = 0;
pool->shutdown = 0;
pthread_create(&pool->managerID,NULL,manager,pool); for(int i = 0;i<min;i++) { pthread_create(&pool->threadIDs[i],NULL,worker,pool); }
return pool; }while(0);
if (pool && pool->threadIDs) free(pool->threadIDs); if (pool && pool->taskQ) free(pool->taskQ); if (pool) free(pool);
return NULL; }
void* worker(void* arg) { Thread_Pool *pool = (Thread_Pool *)arg;
while(1) { pthread_mutex_lock(&pool->mutexPool); while(pool->queueSize == 0 && !pool->shutdown) { pthread_cond_wait(&pool->notEmpty,&pool->mutexPool);
if(pool->exitNum > 0) { pool->exitNum--; if(pool->liveNum > pool->minNum) { pool->liveNum--; pthread_mutex_unlock(&pool->mutexPool); threadExit(pool); } } }
if(pool->shutdown > 0) { pthread_mutex_unlock(&pool->mutexPool); threadExit(pool); }
Task task; task.function = pool->taskQ[pool->queueFront].function; task.arg = pool->taskQ[pool->queueFront].arg; pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity; pool->queueSize--; pthread_cond_signal(&pool->notFull);
pthread_mutex_unlock(&pool->mutexPool);
printf("thread %ld start working... \n",pthread_self()); pthread_mutex_lock(&pool->mutexBusy); pool->busyNum++; pthread_mutex_unlock(&pool->mutexBusy);
task.function(task.arg); free(task.arg); task.arg = NULL; printf("thread %ld end working... \n",pthread_self());
pthread_mutex_lock(&pool->mutexBusy); pool->busyNum--; pthread_mutex_unlock(&pool->mutexBusy); } return NULL; }
void* manager(void* arg) { Thread_Pool * pool = (Thread_Pool*)arg; while(!pool->shutdown) { sleep(3);
pthread_mutex_lock(&pool->mutexPool); int queueSize = pool->queueSize; int liveNum = pool->liveNum; pthread_mutex_unlock(&pool->mutexPool);
pthread_mutex_lock(&pool->mutexBusy); int busyNum = pool->busyNum; pthread_mutex_unlock(&pool->mutexBusy);
if(queueSize > liveNum && liveNum < pool->maxNum) { pthread_mutex_lock(&pool->mutexPool); int counter = 0; for(int i = 0;i<pool->maxNum && counter < NUMBER && pool->liveNum < pool->maxNum;i++) { if(pool->threadIDs[i] == 0) { pthread_create(&pool->threadIDs[i],NULL,worker,pool); counter++; pool->liveNum++; } } pthread_mutex_unlock(&pool->mutexPool); }
if(busyNum *2 < liveNum && liveNum > pool->liveNum) { pthread_mutex_lock(&pool->mutexPool); pool->exitNum = NUMBER; pthread_mutex_unlock(&pool->mutexPool);
for(int i=0;i<NUMBER;i++) { pthread_cond_signal(&pool->notEmpty); } }
} printf("Manager Thread,%ld exiting...............\n",pthread_self()); pthread_exit(NULL); }
void threadExit(Thread_Pool* pool) { pthread_t tid = pthread_self(); for(int i = 0;i<pool->maxNum;i++) { if(pool->threadIDs[i] == tid) { pool->threadIDs[i] = 0; printf("threadExit() called,%ld exiting...............\n",tid); break; } } pthread_exit(NULL); pthread_join(tid,NULL); }
void threadPoolAdd(Thread_Pool* pool, void(*func)(void*), void* arg) { pthread_mutex_lock(&pool->mutexPool); while(pool->queueSize == pool->queueCapacity && !pool->shutdown) { pthread_cond_wait(&pool->notFull,&pool->mutexPool); } if(pool->shutdown) { pthread_mutex_unlock(&pool->mutexPool); return; } pool->taskQ[pool->queueRear].function = func; pool->taskQ[pool->queueRear].arg = arg; pool->queueRear =(pool->queueRear + 1) % pool->queueCapacity; pool->queueSize++;
pthread_cond_signal(&pool->notEmpty);
pthread_mutex_unlock(&pool->mutexPool); }
int threadPoolBusyNum(Thread_Pool* pool) { pthread_mutex_lock(&pool->mutexPool); int busyNum = pool->busyNum; pthread_mutex_unlock(&pool->mutexPool); return busyNum; }
int threadPoolAliveNum(Thread_Pool* pool) { pthread_mutex_lock(&pool->mutexPool); int liveNum = pool->liveNum; pthread_mutex_unlock(&pool->mutexPool); return liveNum; }
int threadPoolDestroy(Thread_Pool* pool) { if(pool == NULL) { return -1; }
pool->shutdown = 1; for(int i = 0;i<pool->liveNum;i++) { pthread_cond_signal(&pool->notEmpty); }
pthread_join(pool->managerID,NULL);
if(pool->taskQ) { free(pool->taskQ); } if(pool->threadIDs) { free(pool->threadIDs); }
pthread_mutex_destroy(&pool->mutexPool); pthread_mutex_destroy(&pool->mutexBusy); pthread_cond_destroy(&pool->notEmpty); pthread_cond_destroy(&pool->notFull);
free(pool); pool = NULL; return 0; }
|
1.线程池创建函数
参数
线程池工作时最少工作线程数量min
线程池工作时最少多工作线程数量max
任务队列的容量queueSize
,于后文提到的任务队列中当前任务数pool->queueSize
不一样
返回值
为struct Threadpool
类型的指针,指向线程池
业务逻辑
将所有创建过程均可以放置在一个do whle(0)
的循环中,该循环只进行一次,使用这种语法的好处时,若创建步骤失败,则可以使用break
中断创建,并且在函数最后对之前创建成功所占用的内存空间进行释放。
(1)工作线程ID的存储:开辟连续内存的堆空间,对工作线程的ID进行存储,其空间大小为sizeof(pthread_t) * max
,创建之后对每一个线程ID存储位置赋值为0
(2)初始化互斥锁与条件变量:若每一次初始化函数返回的值不为0,则使用break
中断
(3)任务队列的创建:开辟动态内存创建queueSize
数量的任务存储空间,并且对任务队列相关的变量进行初始化
(4)管理者线程与工作线程的创建:在实例代码中仅创建一个管理者线程,并且工作线程在开始时,仅创建最少的工作线程数量。
(5)创建失败则回收资源:上述步骤中,若创建失败,则回收资源。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| Thread_Pool *threadPoolCreate(int min,int max,int queueSize) { Thread_Pool *pool; pool = malloc(sizeof(*pool)); do { if (pool == NULL) { printf("malloc threadpool fail...\n"); break; }
pool->threadIDs = malloc(sizeof(pthread_t) * max); if (pool->threadIDs == NULL) { printf("malloc threadIDS fail...\n"); break; } memset(pool->threadIDs,0,sizeof(pthread_t)*max); pool->minNum = min; pool->maxNum = max; pool->busyNum = 0; pool->liveNum = min; pool->exitNum = 0;
if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 || pthread_mutex_init(&pool->mutexBusy, NULL) != 0 || pthread_cond_init(&pool->notEmpty, NULL) != 0 || pthread_cond_init(&pool->notFull, NULL) != 0) { printf("mutex or condition init fail...\n"); break; }
pool->taskQ = malloc(sizeof(Task) * queueSize); pool->queueCapacity = queueSize; pool->queueSize = 0; pool->queueFront = 0; pool->queueRear = 0;
pool->shutdown = 0;
pthread_create(&pool->managerID,NULL,manager,pool); for(int i = 0;i<min;i++) { pthread_create(&pool->threadIDs[i],NULL,worker,pool); }
return pool; }while(0);
if (pool && pool->threadIDs) free(pool->threadIDs); if (pool && pool->taskQ) free(pool->taskQ); if (pool) free(pool);
return NULL; }
|
2.工作线程的任务函数
worker
函数
参数:
void *
类型,在本实例代码中,传入的参数为指向线程池的指针,Thread_Pool *pool
返回值:
void *
类型
业务逻辑
任务函数,需要一直运行,因此在while(1)
的循环中进行操作
每一个工作线程均会执行一个任务函数,为了保持线程同步,因此在执行任务函数时,需要将线程池进行加锁操作。
(1)判断任务队列是否为空以及当前是否执行线程销毁:若任务队列为空,则需要阻塞工作线程,等待生产者线程为任务队列中添加任务之后在继续操作。
(2)工作线程诱导自杀:工作线程解除阻塞之后,若有空闲的工作线程存在,则需要对要销毁的工作线程个数进行判断,若需要销毁的工作线程个数pool->exitNum
大于0
,则先对调用该函数的工作线程执行解锁操作(pthread_cond_wait
解除阻塞之后,得到CPU
时间片的工作线程会先解锁后加锁),最后经过相关的条件判断执行该工作线程的自杀操作,该工作调用线程主动调用线程退出函数
(3)判断线程池是否关闭销毁:工作线程解除阻塞之后,需要对销毁线程池标志位进行判断,若为真,则先对该工作线程进行解锁操作,在执行线程退出函数
(4)工作线程正常执行任务:若(2)(3)的判断均为否,则从任务队列的头部取出任务,在执行。期间需要移动队列头节点,指向下一个任务。并且对任务队列中当前任务总数queueSize
进行-1
。若此前任务队列为满时,则唤醒阻塞的生产者线程,继续为任务队列中添加任务。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
|
void* worker(void* arg) { Thread_Pool *pool = (Thread_Pool *)arg;
while(1) { pthread_mutex_lock(&pool->mutexPool); while(pool->queueSize == 0 && !pool->shutdown) { pthread_cond_wait(&pool->notEmpty,&pool->mutexPool);
if(pool->exitNum > 0) { pool->exitNum--; if(pool->liveNum > pool->minNum) { pool->liveNum--; pthread_mutex_unlock(&pool->mutexPool); threadExit(pool); } } }
if(pool->shutdown > 0) { pthread_mutex_unlock(&pool->mutexPool); threadExit(pool); }
Task task; task.function = pool->taskQ[pool->queueFront].function; task.arg = pool->taskQ[pool->queueFront].arg; pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity; pool->queueSize--; pthread_cond_signal(&pool->notFull);
pthread_mutex_unlock(&pool->mutexPool);
printf("thread %ld start working... \n",pthread_self()); pthread_mutex_lock(&pool->mutexBusy); pool->busyNum++; pthread_mutex_unlock(&pool->mutexBusy);
task.function(task.arg); free(task.arg); task.arg = NULL; printf("thread %ld end working... \n",pthread_self());
pthread_mutex_lock(&pool->mutexBusy); pool->busyNum--; pthread_mutex_unlock(&pool->mutexBusy); } return NULL; }
|
3.管理者线程任务函数
manager
函数
参数:
void *
类型,在本实例代码中,传入的参数为指向线程池的指针,Thread_Pool *pool
返回值:
void *
类型
业务逻辑:
在while
循环中判断线程销毁的标志位是否为真
每隔3s检测一次
(1)添加工作线程:当工作线程忙不过来时,则需要继续添加工作线程,减轻系统任务压力
(2)销毁工作线程:当任务数量比较少,此时有较多空闲的工作线程,并且其数量满足一定条件关系时,则需要进行工作线程的销毁工作,在实例代码中进行工作线程的销毁思路比较巧妙。通过唤醒阻塞在worker
函数中等待任务队列中继续添加任务的工作线程诱导其自杀。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
|
void* manager(void* arg) { Thread_Pool * pool = (Thread_Pool*)arg; while(!pool->shutdown) { sleep(3);
pthread_mutex_lock(&pool->mutexPool); int queueSize = pool->queueSize; int liveNum = pool->liveNum; pthread_mutex_unlock(&pool->mutexPool);
pthread_mutex_lock(&pool->mutexBusy); int busyNum = pool->busyNum; pthread_mutex_unlock(&pool->mutexBusy);
if(queueSize > liveNum && liveNum < pool->maxNum) { pthread_mutex_lock(&pool->mutexPool); int counter = 0; for(int i = 0;i<pool->maxNum && counter < NUMBER && pool->liveNum < pool->maxNum;i++) { if(pool->threadIDs[i] == 0) { pthread_create(&pool->threadIDs[i],NULL,worker,pool); counter++; pool->liveNum++; } } pthread_mutex_unlock(&pool->mutexPool); }
if(busyNum *2 < liveNum && liveNum > pool->liveNum) { pthread_mutex_lock(&pool->mutexPool); pool->exitNum = NUMBER; pthread_mutex_unlock(&pool->mutexPool);
for(int i=0;i<NUMBER;i++) { pthread_cond_signal(&pool->notEmpty); } }
} printf("Manager Thread,%ld exiting...............\n",pthread_self()); pthread_exit(NULL); }
|
4.线程退出函数
threadExit
函数
参数:
void *
类型,在本实例代码中,传入的参数为指向线程池的指针,Thread_Pool *pool
返回值:
void
类型
业务逻辑:
该线程退出函数针对的是,线程池中工作的子线程
在执行调用该函数的工作线程退出操作时,需要先对存储该工作线程ID的数组位置清除占用(重新赋值为0)
之后执行pthread
库中的pthread_exit(NULL)
退出该线程,并且对该线程进行资源回收,调用pthread_join(tid,NULL)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
void threadExit(Thread_Pool* pool) { pthread_t tid = pthread_self(); for(int i = 0;i<pool->maxNum;i++) { if(pool->threadIDs[i] == tid) { pool->threadIDs[i] = 0; printf("threadExit() called,%ld exiting...............\n",tid); break; } } pthread_exit(NULL); pthread_join(tid,NULL); }
|
5.线程池添加任务函数
threadPoolAdd
函数
参数:
void *
类型,在本实例代码中,传入的参数为指向线程池的指针,Thread_Pool *pool
任务函数的地址,在形参中使用void(*func)(void*)
类型的函数指针去指向
任务函数的参数void* arg
类型
返回值:
void
类型
业务逻辑:
为线程池的任务队列中添加任务
在执行添加任务期间,需要添加互斥锁,不能多个线程同时添加任务又取出任务
(1)任务队列已满:需要对生产者线程进行阻塞,当消费者(工作线程)取出任务后可对生产者线程进行唤醒
(2)线程池销毁标志位为真:解开互斥锁,返回
(3)添加任务操作:循环队列队尾进行入队操作,添加完任务之后可以唤醒阻塞的工作线程继续工作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
|
void threadPoolAdd(Thread_Pool* pool, void(*func)(void*), void* arg) { pthread_mutex_lock(&pool->mutexPool); while(pool->queueSize == pool->queueCapacity && !pool->shutdown) { pthread_cond_wait(&pool->notFull,&pool->mutexPool); } if(pool->shutdown) { pthread_mutex_unlock(&pool->mutexPool); return; } pool->taskQ[pool->queueRear].function = func; pool->taskQ[pool->queueRear].arg = arg; pool->queueRear =(pool->queueRear + 1) % pool->queueCapacity; pool->queueSize++;
pthread_cond_signal(&pool->notEmpty);
pthread_mutex_unlock(&pool->mutexPool); }
|
6.销毁线程池函数
threadPoolDestroy
函数
参数:
void *
类型,在本实例代码中,传入的参数为指向线程池的指针,Thread_Pool *pool
返回值:
void
类型
业务逻辑:
设置销毁线程池标志位pool->shutdown
为真,赋值为1
(1)循环唤醒存活的消费者(工作线程):在worker
函数中pool->shutdown > 0
判断为真,则会执行代码块中的threadExit(pool)
线程退出函数
(2)阻塞回收管理者线程:在manager
函数中pool->shutdown > 0
判断为真,则会跳出循环执行pthread
库中的线程退出函数pthread_exit(NULL)
,在threadPoolDestroy
函数中使用pthread_join(pool->managerID,NULL)
函数对管理者线程进行阻塞回收
(3)释放内存:释放任务队列以及存储工作线程ID以及线程池的堆内存。
(4)销毁变量:销毁条件变量以及互斥锁变量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| int threadPoolDestroy(Thread_Pool* pool) { if(pool == NULL) { return -1; }
pool->shutdown = 1; for(int i = 0;i<pool->liveNum;i++) { pthread_cond_signal(&pool->notEmpty); }
pthread_join(pool->managerID,NULL);
if(pool->taskQ) { free(pool->taskQ); } if(pool->threadIDs) { free(pool->threadIDs); }
pthread_mutex_destroy(&pool->mutexPool); pthread_mutex_destroy(&pool->mutexBusy); pthread_cond_destroy(&pool->notEmpty); pthread_cond_destroy(&pool->notFull);
free(pool); pool = NULL; return 0; }
|
六、测试代码
main.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| #include <stdio.h> #include "threadpool.h" #include <unistd.h> #include <malloc.h>
void taskFunc(void* arg) { int num = *((int*)arg); printf("thread %ld is working, number = %d\n",pthread_self(), num); sleep(1); }
int main() { Thread_Pool *pool = threadPoolCreate(1,10,100);
for(int i = 0;i<100;++i) { int *num = (int *)malloc(sizeof(int)); *num = i + 1; threadPoolAdd(pool,taskFunc,num); }
sleep(30);
threadPoolDestroy(pool); return 0; }
|
CMakeLists.txt
1 2 3 4 5 6 7 8 9 10 11 12
| cmake_minimum_required(VERSION 3.20) project(untitled C)
set(CMAKE_C_STANDARD 99)
# 寻找线程库 find_package(Threads REQUIRED)
add_executable(untitled main.c threadpool.c threadpool.h)
# 链接线程库 target_link_libraries(untitled ${CMAKE_THREAD_LIBS_INIT})
|
七、运行结果
工作的10个消费者线程以及管理者线程均成功退出
