UNIX环境编程-并发(11)

一、信号

1.信号的概念

​ 信号是软件层面的中断,信号的响应依赖于中断

signal)是一种用于异步通知进程某个事件已经发生的机制。当进程收到一个信号时,它可以有几种不同的反应,包括忽略该信号、执行默认的信号处理动作,或者调用一个用户定义的处理函数

(1)同步与异步

同步

在同步操作中,一个任务必须等待前一个任务完成后才能继续执行。这意味着您在等待任务完成时不能做任何其他事情

特点

  • 执行顺序是可预测的。
  • 容易编写和理解,因为操作按预期的顺序执行。
  • 可能会浪费CPU时间,因为它在等待外部资源(如磁盘IO、网络请求等)时可能会处于闲置状态

异步
在异步操作中,你可以在等待某个任务完成时继续执行其他任务。当那个任务完成后,系统通常会使用回调、事件或其他机制来通知你

特点

  • 执行顺序不是固定的,可能需要额外的工具或方法(如回调、promises、futures等)来处理。
  • 可以更充分地利用CPU,因为它可以在等待外部资源时执行其他任务。
  • 可能会更难编写和理解,特别是当涉及到大量的异步操作和回调时


2.signal()

ubuntu 终端中输入kill -l 可以查看各类信号

image-20230808112209488

(1)函数原型
1
2
3
4
5
6
7
#include <signal.h>

// 将一个 void (*func)(int) 函数指针类型 typedef 为 sighandler_t
typedef void (*sighandler_t)(int);

sighandler_t signal(int signum, sighandler_t handler);
// void (*signal(int signum,void (*handler)(int)))(int);

signal() 函数用于处理信号。这是一个基本的信号处理接口,它允许程序员为特定的信号定义一个处理函数

参数

  • signum 是要捕获或忽略的信号。
  • handler 是当指定的信号发生时要调用的函数的指针(函数指针,回调函数),或者是以下特定的值:
    • SIG_IGN:忽略该信号。
    • SIG_DFL:采取信号的默认动作。

函数返回值是之前关联到指定信号的处理函数的地址,或者是SIG_IGNSIG_DFL。如果出错,则返回 SIG_ERR


(2)程序实例

将文件描述符1号位置 每秒输入一个字符 * 即标准输出中每秒打印一个*号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <unistd.h>

int main()
{
int i;

for(i = 0;i<10;i++)
{
// 将文件描述符1号位置 每秒输入一个字符 * 即标准输出
write(1,"*",1);
sleep(1);
}
exit(0);
}

若此时在终端中键入ctrl + c则会打断打印输出,即信号SIGINT

  • SIGINT:中断信号(终端终端符)。当用户从键盘按下Ctrl+C时发送。通常用于中断进程
  • SIGQUIT:退出信号。和SIGINT类似,但会导致进程核心转储,Ctrl+\
忽略SIGINT信号

在程序中使用signal函数,忽略SIGINT信号,使得ctrl + c无法打断其输出打印

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <unistd.h>

int main()
{
int i;

signal(SIGINT,SIG_IGN);

for(i = 0;i<10;i++)
{
// 将文件描述符1号位置 每秒输入一个字符 * 即标准输出
write(1,"*",1);
sleep(1);
}
exit(0);
}

运行结果

image-20230808153642731

捕获SIGINT信号,执行其他任务

当程序捕获SIGINT信号时,则在终端打印输出一个!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <unistd.h>

static void int_handler(int s)
{
// 将文件描述符1号位置 输入一个字符 ! 即标准输出
write(1,"!",1);
}

int main()
{
int i;

// 捕获到SIGINT信号时,则使用回调函数int_handler
signal(SIGINT,int_handler);

for(i = 0;i<10;i++)
{
// 将文件描述符1号位置 每秒输入一个字符 * 即标准输出
write(1,"*",1);
sleep(1);
}
exit(0);
}

image-20230808154410899

当程序执行过程中,若一直键入ctrl + c,程序不会执行10s,一瞬间会输出完,出现如下情况

image-20230808155203918

重点:信号会打断阻塞的系统调用

上述的sleep函数就是被打断的系统调用

当一个进程在执行某个阻塞的系统调用(如read(), write(), accept(), sleep()等)时接收到某些信号,并且为该信号设置了处理函数,则该系统调用可能会被中断并提前返回。这是因为信号处理函数被执行,系统调用返回错误,并且errno被设置为EINTR,表示系统调用被一个信号中断

例如,考虑一个进程正在执行一个阻塞的read()调用,等待从文件或套接字中读取数据。如果在这期间该进程接收到一个信号,并且已经为该信号注册了处理函数,那么read()可能会返回-1,并且errno会被设置为EINTR

为了处理这种情况,通常的策略是在检测到系统调用因为EINTR而失败后重新执行该系统调用。这样可以确保系统调用能够完成其原始的任务。

这是一个简单的处理read()被信号中断的例子

1
2
3
4
5
6
7
8
9
10
11
12
ssize_t ret;
do {
ret = read(fd, buffer, sizeof(buffer));
if(ret < 0)
// 不是信号打断的,则是真的出错了
// 若是假的错误,则不会进入if语句,并且重新回到read函数位置,重新得到ret,若正常执行,则会跳出do-while语句
if(errno != EINTR)
{
perror("read()");
exit(1);
}
} while (ret < 0);

为了避免某些系统调用被中断的行为,你可以使用sigaction()代替signal()来注册信号处理器,并设置SA_RESTART标志。这会使得大多数被此信号中断的系统调用自动重新启动



3.信号的不可靠

信号的行为不可靠

信号机制在早期的UNIX系统中被认为是不可靠的,主要是因为以下原因:

  • 信号丢失:在早期的实现中,如果一个信号在其处理程序执行时再次被发送,那么这个信号可能会被忽略。也就是说,如果两次连续发送同一个信号,而在第一个信号的处理程序还没有执行完毕时,第二个信号就到达了,那么第二个信号可能不会被注意到。
  • 信号不排队:对于大多数信号来说,不论发送了多少次同样的信号,如果之前的信号尚未处理,则只会保留一次。也就是说,如果你发送了三次SIGUSR1信号,但在处理第一个信号之前,其他两个信号就到达了,那么这两个信号可能被视为一个。
  • 默认行为和信号处理的改变:当一个进程接收到一个它没有为其注册处理函数的信号时,它会执行该信号的默认行为(如终止)。但在早期的signal()实现中,当一个信号处理程序被调用时,该信号的行为会被重置为默认行为。这意味着,如果在处理某个信号时再次接收到同样的信号,那么可能会执行该信号的默认行为,而不是再次调用信号处理程序。
  • 非原子性:在信号处理函数和主程序之间,存在竞态条件,这可能导致不确定的行为。

由于上述原因,程序员必须在使用信号时特别小心。为了解决这些问题,后来的UNIX系统和POSIX标准引入了更为可靠的信号处理机制,如sigaction()函数和sigprocmask()函数等,它们提供了更加详细和可控的信号处理能力。

在现代的UNIX-like系统中,尽管许多早期的问题已经被解决,但仍然建议在设计信号处理时采取谨慎的态度,并充分了解特定平台上的信号语义和行为



4.可重入函数

重入:第一次调用没有结束,第二次调用就开始了,但是不会出错

在计算机编程中,特别是在多线程和信号处理的上下文中,”可重入”是一个重要的概念。一个函数如果是可重入的,那么它可以在它自身还未完成之前被安全地再次调用。这意味着多个线程可以同时或者在中断上下文中调用这样的函数,而不必担心会引发竞态条件或其他未定义的行为

所有的系统调用均为可重入的,一部分标准的库函数也是可重入的,如:memcpy()

以下是使函数可重入的一些关键属性:

  • 不使用全局和静态变量:全局和静态变量在函数的多次调用之间是持久的,所以如果一个函数修改了这样的变量,那么其它的调用可能会看到不一致的状态。
  • 不修改其参数:函数不应该修改其传入的参数,除非这些修改是调用者所期望的。
  • 不调用其他非可重入的函数:这是很直观的,因为如果一个可重入的函数调用了一个非可重入的函数,那么整个调用链就不能被视为是可重入的。
  • 使用局部变量代替其他形式的存储:局部变量在栈上分配,所以每次函数调用都会有其自己的一套局部变量。
  • 线程安全:虽然可重入性和线程安全性并不完全相同,但可重入函数在多线程环境中也是线程安全的。

在信号处理中,可重入性尤为重要,因为你无法预测何时会接收到一个信号。如果一个信号处理函数调用了一个非可重入的函数(例如许多标准库函数),那么可能会出现竞态条件,导致不确定的行为

例如,malloc()free() 这样的内存管理函数在多线程或信号处理上下文中通常不是可重入的,因为它们维护了一个全局的内存分配表。因此,如果在信号处理器中使用这些函数可能是危险的



5.信号的响应过程

视频教程

​ 信号从收到至响应有一个不可以避免的延迟

​ 思考:如何忽略掉一个信号?

​ 思考:标准信号为什么丢失?

​ 标准信号的响应没有严格的顺序



6.常用函数

(1)kill()

kill函数是用来发送信号给进程的。这个函数定义在<signal.h>头文件中

函数原型

1
2
3
#include <signal.h>

int kill(pid_t pid, int sig);

参数

pid:这是要接收信号的进程的进程ID。

  • 如果 pid > 0,则信号发送给该pid对应的进程。
  • 如果 pid == 0,则信号发送给当前进程和同一个进程组的所有进程(组内广播)。
  • 如果 pid == -1,则信号发送给除了init进程(其PID通常为1)之外的所有进程(全局广播)。调用进程必须有相应的权限。
  • 如果 pid < -1,则信号发送给进程组ID为-pid的所有进程。

sig:要发送的信号编号。例如,SIGKILLSIGTERM等。发送0信号可以用来检查一个进程是否存在(它不会导致进程终止或接收任何实际的信号

返回值

  • 成功:返回0
  • 失败:返回-1,并设置errno来表示错误原因

注意

检测进程存在性:当 sig 为 0 时,kill() 实际上并不发送任何信号。它只是用于检测指定 pid 的进程或进程组是否存在,以及调用进程是否有权限向其发送信号

程序实例

要终止PID12345的进程,你可以这样做:

1
2
3
4
5
#include <signal.h>

// ...

kill(12345, SIGKILL);

杀死一个进程通常应该更为优雅。许多程序首先发送SIGTERM信号来请求进程终止,给它机会进行清理工作。如果那不起作用,然后再发送SIGKILL

注意:直接使用kill需要谨慎,因为不当地发送信号可能导致不希望的副作用,比如数据丢失或进程不正常终止


(2)raise()

raise函数用于发送一个信号给当前执行的进程。它也定义在<signal.h>头文件中

函数原型

1
2
3
#include <signal.h>

int raise(int sig);

参数

sig:要发送的信号编号。例如,SIGABRTSIGINTSIGTERM

返回值

  • 成功:返回0
  • 失败:返回非0值

程序实例

你可能在一个程序中使用raise来生成一个中断信号:

1
2
3
4
5
6
7
8
9
#include <signal.h>
#include <stdio.h>

int main() {
printf("Raising the interrupt signal...\n");
raise(SIGINT); // Sends an interrupt signal to itself.
printf("After raise.\n");
return 0;
}

在这个例子中,程序会发送一个SIGINT信号给自己,模拟像通过键盘发送的Ctrl+C命令。这通常会导致进程终止,除非你有一个针对这个信号的处理函数。

使用raise函数可以在程序中触发信号处理程序,或者模拟外部条件导致的信号


(3)alarm()

这个函数用于为调用进程设置一个实时闹钟,当这个闹钟超时时,系统会向该进程发送一个SIGALRM信号。它定义在<unistd.h>头文件中

函数原型

1
2
3
#include <unistd.h>

unsigned int alarm(unsigned int seconds);

参数

seconds:设置的超时时间,单位为秒。当设置为0时,任何之前设置的闹钟都会被取消

返回值

  • 如果之前已经设置了一个闹钟,且它尚未超时,那么alarm函数会返回之前那个闹钟的剩余时间(秒)。否则,它返回0。

alarm设置的时间到达时,操作系统会向进程发送SIGALRM信号。默认情况下,这会终止进程。但你可以使用signalsigaction函数来定义自己的处理函数,从而在收到SIGALRM信号时执行特定的操作

  • SIGALRM默认行为:当进程接收到SIGALRM信号且没有为它设置处理程序时,默认的行为是终止进程
  • SIGALRM自定义处理:你可以使用signalsigaction函数为SIGALRM定义自己的处理程序,这样你就可以在接收到信号时执行特定的操作,而不是让进程终止

实例程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#include <stdio.h>
#include <unistd.h>
#include <signal.h>

void handle_alarm(int sig) {
printf("Alarm went off!\n");
}

int main() {
signal(SIGALRM, handle_alarm); // Set up the signal handler
printf("Setting an alarm for 5 seconds...\n");
alarm(5); // Set the alarm
pause(); // Wait for signals to arrive (like SIGALRM)
printf("Exiting program.\n");
return 0;
}

程序设置了一个5秒的闹钟,并使用pause函数等待信号的到来。当闹钟超时,会执行handle_alarm函数,打印出“Alarm went off!”。然后程序继续执行并退出

注意:alarm函数只能设置一个闹钟。如果在前一个闹钟超时前再次调用它,前一个闹钟会被新的值替代

应用场景
  • 定时操作:你可以使用alarmSIGALRM来在程序中执行定时任务。例如,如果你的程序在网络上等待数据,但你不希望它永远等待,你可以设置一个闹钟作为超时机制。
  • 资源限制:例如,当运行某些计算密集型任务时,你可能希望限制它们的执行时间
注意事项
  • 因为alarm只能设置一个闹钟,所以每次调用alarm都会重置之前的闹钟。
  • 信号机制不是实时的,所以当你设置一个很短的闹钟时,不一定能够精确地在你期望的时间内触发。
  • 在信号处理程序中,你应该避免调用可能会改变程序状态或不是异步信号安全的函数
定时循环程序程序实例

使用time()函数进行定时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <stdio.h>
#include <unistd.h>
#include <signal.h>
#include <time.h>
#include <stdint.h>

int main()
{
time_t end;
int64_t count = 0;

// time(NULL)可以获得当前的时间戳,单位为秒
end = time(NULL) + 5;

while(time(NULL) <= end)
{
count++;
}

printf("%ld\n",count);

exit(0);
}

该程序运行5s后,在终端打印输出,在该目录下新建out文件,将程序实际运行程序重定向输入至out

运行结果,实际运行时间比5s多一点,存在误差

1
2
3
4
5
(base) zxz@zxz-B660M-GAMING-X-AX-DDR4:~/Proj/CLionProj/UNIX/IO/signal$ time ./5 >> ./out 

real 0m5.317s
user 0m5.309s
sys 0m0.008s

使用alarm()函数进行定时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <stdint.h>
#include <unistd.h>

static int loop = 1;

static void alrm_handler(int s)
{
loop = 0;
}

int main()
{
int64_t count = 0;

signal(SIGALRM,alrm_handler);

// 程序运行5s之后向进程发送SIGALRM信号
alarm(5);



while(loop)
count++;

// 当loop = 0时,即5s之后打印输出
printf("%ld\n",count);

exit(0);
}

在终端中使用指令time ./6 >> ./out运行程序,程序打印结果重定向至out文件中

程序结果

1
2
3
4
5
(base) zxz@zxz-B660M-GAMING-X-AX-DDR4:~/Proj/CLionProj/UNIX/IO/signal$ time ./6 >> ./out 

real 0m5.002s
user 0m4.998s
sys 0m0.004s

使用alarm()信号的方式去定时,相较于time()精度更高


(4)pause()

pause函数被用于使进程暂停执行,直到它接收到一个信号。一旦进程接收到信号,pause函数返回,然后进程可以继续执行。这个函数定义在<unistd.h>头文件中

pause函数会使调用它的进程休眠,直到该进程捕获到一个信号。无论信号是否被捕获或忽略,pause都会返回

函数原型

1
2
3
#include <unistd.h>

int pause(void);

返回值

  • pause函数永远不会成功返回。如果被中断,它将返回-1,并设置errnoEINTR

pause函数在某些场景下是有用的,尤其是当你希望进程在等待外部事件(通常是信号)发生时保持休眠


(5)abort()

abort函数是一个标准库函数,位于头文件 <stdlib.h> 中。它的主要功能是用于终止程序的执行。当调用abort函数时,程序会立即停止运行,并生成一个终止信号,通常会导致程序的终止

abort函数通常在以下情况下被调用:

  • 严重错误: 当程序遇到无法恢复的错误或异常情况时,可以调用abort来中止程序。例如,内存分配失败,或者发生了不应该出现的情况。

  • 调试: 在调试过程中,可以使用abort函数来强制终止程序并获取调试信息,以便进行分析


(6)system()

system函数是一个标准库函数,位于头文件 <stdlib.h> 中。它用于在程序中执行系统命令或外部程序,并等待命令执行完毕后继续程序的执行

system函数的原型如下:

1
int system(const char *command);

参数 command 是一个字符串,表示要执行的系统命令或外部程序。当调用system函数时,它会启动一个新的进程来执行指定的命令,然后等待命令执行完毕。函数的返回值表示命令执行的状态:

  • 如果命令执行成功,system函数返回一个正整数值(通常是0)。
  • 如果命令执行失败,或者无法执行命令,system函数返回一个非零值。

程序实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#include <stdio.h>
#include <stdlib.h>

int main() {
int commandResult;

// 调用 system 函数来执行一个系统命令
commandResult = system("ls -l");

// 根据返回值判断命令执行状态
if (commandResult == 0) {
printf("Command executed successfully.\n");
} else {
printf("Command execution failed.\n");
}

return 0;
}

在上述示例中,system("ls -l") 执行了一个简单的 ls -l 命令,该命令会列出当前目录的文件和文件夹的详细信息。程序根据system函数的返回值判断命令是否执行成功


(7)sleep()

sleep函数是一个标准库函数,位于头文件 <unistd.h> 中。它的作用是让程序在指定的时间内暂停(休眠)执行,以达到一定的延迟效果。sleep函数接受一个以秒为单位的整数参数,表示程序需要休眠的时间

sleep函数的原型如下:

1
unsigned int sleep(unsigned int seconds);

参数 seconds 是一个无符号整数,表示程序需要休眠的秒数。函数会使程序休眠指定的秒数,然后继续执行

需要注意的是,sleep函数的精度是以秒为单位,如果需要更精细的延迟,可以使用其他系统调用,如 usleepnanosleep,它们允许以微秒和纳秒为单位进行延迟

此外,sleep函数通常在单线程环境下使用,如果在多线程程序中使用,它可能会导致整个程序休眠,而不仅仅是调用线程。在多线程环境中,可以考虑使用线程专用的延迟函数来避免这种情况


(8)nanosleep()

naaosleep()提供比传统的 sleep 函数更高的时间分辨率,允许使用纳秒级别的精度来指定睡眠时间

1
2
3
#include <time.h>

int nanosleep(const struct timespec *req, struct timespec *rem);

参数:

  • req:指向 timespec 结构的指针,该结构指定了要暂停的时间长度。timespec 结构如下定义:
1
2
3
4
struct timespec {
time_t tv_sec; // 秒
long tv_nsec; // 纳秒
};
  • rem:如果非 NULL,则在函数返回时,此结构会被设置为未睡完的剩余时间。这通常在 nanosleep 被信号打断时发生,允许程序决定是否重新开始剩余的睡眠时间

返回值:

  • 成功:返回 0
  • 错误:返回 -1 并设置 errno 来指示错误类型

实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <stdio.h>
#include <time.h>

int main() {
struct timespec ts;
ts.tv_sec = 2; // 秒
ts.tv_nsec = 500000000L; // 纳秒

// 调用 nanosleep
if (nanosleep(&ts, NULL) < 0) {
perror("nanosleep");
} else {
printf("Slept for 2.5 seconds\n");
}

return 0;
}

(9)usleep()

允许程序以微秒(百万分之一秒)的单位进行睡眠。然而,需要注意的是,usleep 函数是一个较老的函数,并且在最新的 POSIX 标准中已经被废弃,推荐使用更现代的 nanosleep 函数或其他高精度定时函数来替代它

1
2
3
#include <unistd.h>

int usleep(useconds_t usec);

参数:

  • usec:要暂停的时间,以微秒为单位。尽管参数类型是 useconds_t,这通常是一个无符号整数类型

行为与限制:

  • usleep 可以暂停程序的执行达到指定的微秒数。但其精确度和实际暂停时间可能受到系统调度策略和系统负载的影响。
  • 在一些系统实现中,usleep 可以接受最大值为 1,000,000 微秒(即1秒)。超过这个值的输入可能导致函数行为不正常。
  • usleep 的一个主要限制是它不能处理被信号中断的情况。如果 usleep 在睡眠期间被信号打断,它不会重新进入睡眠状态


7.令牌桶算法

(1) 模拟cp指令

模拟cp指令,将src文件,复制到dest文件中

1
cp src dest
mycopy1.c

通过fopen函数返回文件流的方式,去读写文件数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/*
* 模拟cp指令,将src文件复制到dest文件中
* */
#include <stdlib.h>
#include <stdio.h>

int main(int argc,char *argv[])
{
FILE *fps,*fpd; // FILE类型的指针
int ch;

// 以读的模式打开 argv[1] 即 src文件
// src文件必须要存在
fps = fopen(argv[1],"r");
if(fps == NULL)
{
perror("fopen()");
exit(1);
}

// 以写的模式打开 argv[2] 即 dest文件
// dest文件可以不存在,在程序运行的时候,进行创建
fpd = fopen(argv[2],"w");
if(fpd == NULL)
{
perror("fopen()");
exit(1);
}

while (1)
{
// 从源文件fps中读取数据
ch = fgetc(fps);
if(ch == EOF) // 读取到文件结尾
{
// 跳出读取
break;
}
// 将源文件中读取的数据写入到目标文件
fputc(ch,fpd);
}

// 关闭文件流
fclose(fps);
fclose(fpd);
exit(0);
}

创建src文件,并且写入内容,运行指令:

1
$ ./mycopy src dest

使用指令diff,判断两个文件的内容是否一致,如下,若终端无内容输出,则表示内容一致

1
diff src dest
mycopy2.c

通过open函数,获取文件描述符,通过文件描述符号,读写文件的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#include <stdlib.h>
#include <stdio.h>

#include <fcntl.h>
#include <unistd.h>

#define BUFSIZE 1024

int main(int argc,char *argv[])
{
// 创建源文件以及目标文件的文件描述符
int sfd,dfd;
// 读写缓冲区
char buf[BUFSIZE];
int len,ret;

if(argc < 3)
{
fprintf(stderr,"Usage...\n");
exit(1);
}

// 打开文件返回文件描述符
// 源文件,以读的形式打开,源文件必须存在
sfd = open(argv[1],O_RDONLY);
if(sfd==-1)
{
perror("open()");
exit(1);
}

// 打开目标文件,以写的模式打开,如果文件不存在,则创建O_CREAT
// 而且若存在则需要清空进行重写截断O_TRUNC
dfd = open(argv[2],O_WRONLY|O_CREAT|O_TRUNC);
if(dfd == -1)
{
perror("open()");
close(sfd); // 关闭源文件,防止产生内存泄漏
exit(1);
}

while(1)
{
// 通过read,在源文件文件描述符sfd映射的文件中,读取数据,存储到缓冲区buf中,一次最大要读取的数据长度为BUFSIZE
// read,返回为实际读取的内容字节数量,内容长度
len = read(sfd,buf,BUFSIZE);
if(len == -1)
{
// 读取失败
perror("read()");
exit(1);
}
if(len == 0)
{
// 读取到文件末尾,跳出循环
break;
}

// 向目标文件中写入数据
// 通过write,从缓冲区buf中,获取字节数量为len的内容写入到,目标文件描述符dfd映射的文件中
// 返回,为实际写入的字节数
ret = write(dfd,buf,len);
if(ret == -1)
{
perror("write()");
break;
}
}
close(sfd);
close(dfd);
exit(0);
}

编译运行,发现存在bug

BUGmycopy2.c上面的程序存在bug,因为在向目标文件写入数据的时候,write函数可能会出错,假设我们需要写入10个字节,但是实际只写入了3个字节,那么ret返回值==3,也是非负值,此时并没有写入全部数据数据,下一次还会继续进行写入,但是第二次写入会将第一次写入的数据进行截断覆盖,因此做以下修正

mycopy3.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>

#define BUFSIZE 1024

int main(int argc,char *argv[])
{
// 创建源文件以及目标文件的文件描述符
int sfd,dfd;
// 读写缓冲区
char buf[BUFSIZE];
int len,ret,pos;

if(argc < 3)
{
fprintf(stderr,"Usage...\n");
exit(1);
}

// 打开文件返回文件描述符
// 源文件,以读的形式打开,源文件必须存在
sfd = open(argv[1],O_RDONLY);
if(sfd==-1)
{
perror("open()");
exit(1);
}

// 打开目标文件,以写的模式打开,如果文件不存在,则创建O_CREAT
// 而且若存在则需要清空进行重写截断O_TRUNC
dfd = open(argv[2],O_WRONLY|O_CREAT|O_TRUNC);
if(dfd == -1)
{
perror("open()");
close(sfd); // 关闭源文件,防止产生内存泄漏
exit(1);
}

while(1)
{
// 通过read,在源文件文件描述符sfd映射的文件中,读取数据,存储到缓冲区buf中,一次最大要读取的数据长度为BUFSIZE
// read,返回为实际读取的内容字节数量,内容长度
len = read(sfd,buf,BUFSIZE);
if(len == -1)
{
// 读取失败
perror("read()");
exit(1);
}
if(len == 0)
{
// 读取到文件末尾,跳出循环
break;
}

pos = 0;
// 当从源文件中读取的内容没有全部写入到目标文件时,就多次在缓冲区上次写到的地方写入
while (len > 0)
{
// 开始向目标文件写入数据
// 之后每次写入都从buf数组中的buf+pos位置写入
// ret 为 write函数 实际向目标文件中写入的字节数
ret = write(dfd,buf+pos,len);
if(ret == -1)
{
perror("write()");
exit(1);
}
pos += ret;
len -= ret;
}
}

close(sfd);
close(dfd);
exit(0);
}
mycopy4.c

因为write与read,open函数都是系统调用,存在假错的情况,因此需要对假错的情况进行判断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>

#define BUFSIZE 1024

int main(int argc,char *argv[])
{
// 创建源文件以及目标文件的文件描述符
int sfd,dfd;
// 读写缓冲区
char buf[BUFSIZE];
int len,ret,pos;

if(argc < 3)
{
fprintf(stderr,"Usage...\n");
exit(1);
}

// 打开文件返回文件描述符
// 源文件,以读的形式打开,源文件必须存在
sfd = open(argv[1],O_RDONLY);
do {
if(sfd<0)
{
// 不是假错
if(errno != EINTR)
{
perror("open()");
exit(1);
}
}
}while(sfd < 0);

// 打开目标文件,以写的模式打开,如果文件不存在,则创建O_CREAT
// 而且若存在则需要清空进行重写截断O_TRUNC
dfd = open(argv[2],O_WRONLY|O_CREAT|O_TRUNC);
do {
if(dfd < -1)
{
// 不是假错
if(errno != EINTR)
{
perror("open()");
close(sfd); // 关闭源文件,防止产生内存泄漏
exit(1);
}
}
}while(dfd < 0);

while(1)
{
// 通过read,在源文件文件描述符sfd映射的文件中,读取数据,存储到缓冲区buf中,一次最大要读取的数据长度为BUFSIZE
// read,返回为实际读取的内容字节数量,内容长度
while((len = read(sfd,buf,BUFSIZE)) < 0)
{
// 假错,重新读取
if(errno == EINTR)
continue;
// 读取失败
perror("read()");
exit(1);
}
if(len == 0)
break; // 读取到文件末尾,跳出循环


pos = 0;
// 当从源文件中读取的内容没有全部写入到目标文件时,就多次在缓冲区上次写到的地方写入
while (len > 0)
{
// 开始向目标文件写入数据
// 之后每次写入都从buf数组中的buf+pos位置写入
// ret 为 write函数 实际向目标文件中写入的字节数
ret = write(dfd,buf+pos,len);
if(ret < 0)
{
// 假错误
if(errno == EINTR)
continue;
perror("write()");
exit(1);
}
pos += ret;
len -= ret;
}
}

close(sfd);
close(dfd);
exit(0);
}

(2) 模拟cat指令

在mycopy4.c的基础上进行修改,只需要将目标文件描述符,改为标准输出即可,这样冲源文件中读取的数据,就会直接输出到终端上,标准输出的文件描述符为1

mycat1.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>

#define BUFSIZE 1024

int main(int argc,char *argv[])
{
// 创建源文件以及目标文件的文件描述符
int sfd,dfd = 1;
// 读写缓冲区
char buf[BUFSIZE];
int len,ret,pos;

if(argc < 2)
{
fprintf(stderr,"Usage...\n");
exit(1);
}

// 打开文件返回文件描述符
// 源文件,以读的形式打开,源文件必须存在
sfd = open(argv[1],O_RDONLY);
do {
if(sfd<0)
{
// 不是假错
if(errno != EINTR)
{
perror("open()");
exit(1);
}
}
}while(sfd < 0);


while(1)
{
// 通过read,在源文件文件描述符sfd映射的文件中,读取数据,存储到缓冲区buf中,一次最大要读取的数据长度为BUFSIZE
// read,返回为实际读取的内容字节数量,内容长度
while((len = read(sfd,buf,BUFSIZE)) < 0)
{
// 假错,重新读取
if(errno == EINTR)
continue;
// 读取失败
perror("read()");
exit(1);
}
if(len == 0)
break; // 读取到文件末尾,跳出循环


pos = 0;
// 当从源文件中读取的内容没有全部写入到目标文件时,就多次在缓冲区上次写到的地方写入
while (len > 0)
{
// 开始向目标文件写入数据
// 之后每次写入都从buf数组中的buf+pos位置写入
// ret 为 write函数 实际向目标文件中写入的字节数
ret = write(dfd,buf+pos,len);
if(ret < 0)
{
// 假错误
if(errno == EINTR)
continue;
perror("write()");
exit(1);
}
pos += ret;
len -= ret;
}
}

close(sfd);
exit(0);
}

编译运行,如下:

image-20240507173717937

mycat2.c–漏桶实现

mycat1.c中是将源文件的内容,一瞬间,全部输出到终端上进行显示,但是如果需要对输出数据速度进行控制呢,将内容每秒10字节的输出,就可以进行流控(流量控制),slowcat.c

漏桶(Leaky Bucket)是一种常见的流量控制算法,用于平滑和限制进入系统的数据流量。其基本思想是:无论流量的峰值如何,都以固定的速率将数据发送到目的地,当数据流过快时,将溢出到“桶”中,而当数据发送速率过快时,将丢弃溢出的数据

通过进程中信号的机制,使用alarm()函数进程定时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
/*
* 控制每秒传输的字符数量为10
* */
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <signal.h>

#define CPS 10 // 每秒传输的字符数量为10
#define BUFSIZE CPS

static volatile int loop = 0; // volatile 变为寄存器变量

// 信号处理函数
static void alarm_handler(int s)
{
alarm(1); // 在处理函数中也进行定时
loop = 1;
}

int main(int argc,char *argv[])
{
// 创建源文件以及目标文件的文件描述符
int sfd,dfd = 1;
// 读写缓冲区
char buf[BUFSIZE];
int len = 0;
int ret = 0;
int pos = 0;

if(argc < 2)
{
fprintf(stderr,"Usage...\n");
exit(1);
}

// 捕捉信号,执行alarm_handler函数
signal(SIGALRM,alarm_handler);
alarm(1); // 通过alarm函数进行定时,当进程运行1s,则会向进程发送信号SIGALRM

// 打开文件返回文件描述符
// 源文件,以读的形式打开,源文件必须存在
sfd = open(argv[1],O_RDONLY);
do {
if(sfd<0)
{
// 不是假错
if(errno != EINTR)
{
perror("open()");
exit(1);
}
}
}while(sfd < 0);


while(1)
{

while(!loop)
pause(); // 阻塞的系统调用,进程暂停,等待信号到来
loop = 0;

// 通过read,在源文件文件描述符sfd映射的文件中,读取数据,存储到缓冲区buf中,一次最大要读取的数据长度为BUFSIZE
// read,返回为实际读取的内容字节数量,内容长度
while((len = read(sfd,buf,BUFSIZE)) < 0)
{
// 假错,重新读取
if(errno == EINTR)
continue;
// 读取失败
perror("read()");
exit(1);
}
if(len == 0)
break; // 读取到文件末尾,跳出循环


pos = 0;
// 当从源文件中读取的内容没有全部写入到目标文件时,就多次在缓冲区上次写到的地方写入
while (len > 0)
{
// 开始向目标文件写入数据
// 之后每次写入都从buf数组中的buf+pos位置写入
// ret 为 write函数 实际向目标文件中写入的字节数
ret = write(dfd,buf+pos,len);
if(ret < 0)
{
// 假错误
if(errno == EINTR)
continue;
perror("write()");
exit(1);
}
pos += ret;
len -= ret;
}
}

close(sfd);
exit(0);
}
mycat3.c–令牌桶实现

mycat2.c中可以理解为,程序有权限在1s中传输10个字节的数据,但是若在这1s内没有数据来(读取的文件中没有数据),程序不能进行传输数据,只能等待数据的到来。这样的效率很底下,换种方式,若在这1s中没有数据到来,程序也不进行干等,而是将1s传输10字节数据的权限进行积攒,等到有大量数据到来的时候,传输积攒的权限数量x10字节的内容。

需要注意一点:积攒权限的数量,存在上限,积攒的权限的数量不能无限大

如下,在将漏桶改为令牌桶的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
/*
* 控制每秒传输的字符数量为10
* */
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <signal.h>

#define CPS 10 // 每秒传输的字符数量为10
#define BUFSIZE CPS
#define BURST 100 // 令牌的上限(积攒权限数量的上限)

// 令牌(权限)
// static volatile int token = 0; // volatile 变为寄存器变量
// sig_atomic_t 信号原子类型,保证这个类型的变量的操作代码,底层一定是一条指令完成的
static volatile sig_atomic_t token = 0;

// 信号处理函数
static void alarm_handler(int s)
{
alarm(1); // 在处理函数中也进行定时
token++;
if(token > BURST)
token = BURST; // token不能大于上限
}

int main(int argc,char *argv[])
{
// 创建源文件以及目标文件的文件描述符
int sfd,dfd = 1;
// 读写缓冲区
char buf[BUFSIZE];
int len = 0;
int ret = 0;
int pos = 0;

if(argc < 2)
{
fprintf(stderr,"Usage...\n");
exit(1);
}

// 捕捉信号,执行alarm_handler函数
signal(SIGALRM,alarm_handler);
alarm(1); // 通过alarm函数进行定时,当进程运行1s,则会向进程发送信号SIGALRM

// 打开文件返回文件描述符
// 源文件,以读的形式打开,源文件必须存在
sfd = open(argv[1],O_RDONLY);
do {
if(sfd<0)
{
// 不是假错
if(errno != EINTR)
{
perror("open()");
exit(1);
}
}
}while(sfd < 0);


while(1)
{

while(token <= 0) // 此时没有令牌 (没有权限在这个时刻,向目的地发送10字节的数据)
pause(); // 阻塞的系统调用,进程暂停,等待信号到来

// 有权限了,但是传输一次数据,需要减少一次权限
token--; // 这一个操作不能保证原子,因此需要将int 类型改为信号原子类型

// 通过read,在源文件文件描述符sfd映射的文件中,读取数据,存储到缓冲区buf中,一次最大要读取的数据长度为BUFSIZE
// read,返回为实际读取的内容字节数量,内容长度
// 令牌桶重点:--------------------------------------------------------------------------
while((len = read(sfd,buf,BUFSIZE)) < 0)
{
// 假错,重新读取
if(errno == EINTR)
continue;
// 读取失败
perror("read()");
exit(1);
}
if(len == 0)
break; // 读取到文件末尾,跳出循环


pos = 0;
// 当从源文件中读取的内容没有全部写入到目标文件时,就多次在缓冲区上次写到的地方写入
while (len > 0)
{
// 开始向目标文件写入数据
// 之后每次写入都从buf数组中的buf+pos位置写入
// ret 为 write函数 实际向目标文件中写入的字节数
ret = write(dfd,buf+pos,len);
if(ret < 0)
{
// 假错误
if(errno == EINTR)
continue;
perror("write()");
exit(1);
}
pos += ret;
len -= ret;
}
}

close(sfd);
exit(0);
}

解析:在mycat3.c中的令牌桶重点,即read函数调用处,若此次读取文件中没有数据,那么由于read函数是一个阻塞的系统调用,进程就会被阻塞在这个地方,但是由于之前设定了定时alarm(1),经过1s,后会有信号打断,系统调用函数read(),并且执行alarm_handler函数,对令牌数量进行加一操作,但是程序在read函数那个while循环中,判断发现此次打断read函数调用为EINTR假错,则会重新调用read函数,若此时读取的文件中还是没有数据,则会继续阻塞在此处,如此往复进行令牌数量的积攒,直到读取的文件中存在内容才会打破阻塞,若此时存在大量的数据,程序可能会在1s内消耗多个令牌权限,若在1s内消耗的令牌数量为3,则是一次性处理了10字节,连续处理了3次


(3) 令牌桶的实现

令牌桶(Token Bucket)是一种流量整形(Traffic Shaping)和流量限制(Rate Limiting)的机制,广泛用于网络带宽管理、服务器请求处理等场景。该算法通过控制数据传输的速率和突发性来确保网络服务的质量和公平性。令牌桶算法的核心思想是使用一个虚拟的“桶”,桶中存放着“令牌”(token),数据包的传输需要消耗令牌

令牌桶算法提供了一种有效的方式来控制和管理数据流,以确保网络资源的合理分配和系统服务的高可用性

令牌桶算法由一个存放令牌(tokens)的桶和一个固定的填充速率组成。数据包发送之前必须先从桶中取得足够的令牌,每个数据包可能需要一个或多个令牌。

  • 桶的大小:决定了在任何给定时间点,桶中可以累积的最大令牌数。这个参数可以影响突发(burst)流量的大小
  • 令牌填充速率:决定了令牌进入桶中的速率,通常以令牌/秒计量。这个速率限制了长期的平均发送速率

工作原理

  • 令牌的生成:令牌以固定的速率被添加到桶中。桶有一个容量上限,超过这个上限的令牌将会被丢弃
  • 数据包的发送:每个要发送的数据包必须先获取一定数量的令牌才能被发送。如果桶中有足够的令牌,数据包就可以立即发送并且相应数量的令牌从桶中移除;如果桶中令牌不足,数据包需要等待直到有足够的令牌
  • 突发流量的控制:由于桶可以存储令牌,因此在令牌积累时允许一定程度的突发传输。这是通过在一段时间不使用令牌,然后使用积累的令牌来一次性发送多个数据包来实现的

工作流程

  • 桶以一定的固定速率(r)获得令牌。
  • 桶中最多可以存储固定数量(b)的令牌,以允许一定程度的突发传输。
  • 当一个数据包到达时,如果桶中有足够的令牌,则从桶中移除相应数量的令牌,并允许数据包发送。
  • 如果桶中的令牌不足,则根据具体实现,数据包可能被丢弃或排队等待。

应用场景;

  • 网络带宽管理:通过限制网络流量的速率,确保网络资源被公平使用,防止某一应用或用户占用过多带宽。
  • 服务器请求处理:在服务器或API服务中,限制请求的速率来保护后端服务不被过载,提高系统的稳定性和可用性。
  • 多媒体传输:在视频流和音频流传输中控制数据的发送速率,以适应不同网络条件和缓冲需求

优点

  • 灵活性:令牌桶算法允许一定程度的突发流量,比较灵活地应对突增的数据传输需求。
  • 平滑网络流量:通过控制数据发送的平均速率,帮助平滑网络流量波动,减少拥塞。
  • 适应性:能够适应不同的网络环境和流量模式,通过调整令牌生成速率和桶的大小来满足不同的需求。
实例程序alarm实现

通过令牌桶,实现流量控制,main函数中是实现了cat指令的功能,但是使用令牌桶实现流控的能力

mytbf.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#ifndef TOKENBUCKET_MYTBF_H
#define TOKENBUCKET_MYTBF_H

#define MYTBF_MAX 1024 // 令牌桶数组大小

typedef void mytbf_t;

// cps 每个令牌可以传输的内容字节数量 burst 桶内的令牌数量上限*每个令牌可以传输的内容字节数量
mytbf_t *mytbf_init(int cps,int burst);

// 从令牌桶中取得令牌,int 第二个参数是想取多少
// 返回值,为真正取得了多少
int mytbf_fetchtoken(mytbf_t *,int );

// 将没有用完的令牌数量还到令牌桶中, int 第二个参数是想还回去的数量
// 返回值,为真正还了多少
int mytbf_returntoken(mytbf_t *,int);

// 销毁令牌桶
int mytbf_destroy(mytbf_t *);

#endif //TOKENBUCKET_MYTBF_H

mytbf.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <signal.h>

#include "mytbf.h"

typedef void (*sighandler_t)(int);

static __sighandler_t alarm_handler_save;

/*令牌桶类型数组,对于文件可以有MYTBF_MAX中传输速率选择*/
static struct mytbf_st* job[MYTBF_MAX];

static int inited = 0;

struct mytbf_st
{
int cps;
int burst;
int token;
int pos;
};

/*信号处理函数*/
static void alarm_handler(int s)
{
alarm(1); // 继续定时1s,时间到发送信号 SIGALRM
for (int i = 0; i < MYTBF_MAX; i++)
{
// 当前令牌桶数组位置存在数据
if(job[i] != NULL)
{
// 为i位置的令牌桶装载令牌
job[i]->token += job[i]->cps;
// 但是令牌桶所积累的令牌数量需要维持在令牌桶的上限
if(job[i]->token > job[i]->burst)
job[i]->token = job[i]->burst;
}
}
}

/*模块卸载*/
static void module_unload(void)
{
// 对SIGALRM的行为进行恢复
signal(SIGALRM, alarm_handler_save);
alarm(0); // 关闭时钟信号
// 释放令牌桶数组中所有的空间
for (int i = 0; i < MYTBF_MAX; i++)
free(job[i]);
}

/*模块加载函数*/
static void module_load(void)
{
// 为信号SIGALRM,注册行为 alarm_handler,捕获到信号SIGALRM时,会执行函数alarm_handler
alarm_handler_save = signal(SIGALRM, alarm_handler);
alarm(1); // 定时1s,时间到发送信号 SIGALRM

// 钩子函数,程序结束前最后执行
atexit(module_unload);
}


static int min(int a, int b)
{
if(a < b)
return a;
return b;
}

/*在令牌桶数组中找到空位置*/
static int get_free_pos(void)
{
for(int i = 0; i < MYTBF_MAX; i++)
{
if(job[i] == NULL)
return i; // 返回空位置的下标
}

return -1;
}

// cps 每个令牌可以传输的内容字节数量 burst 桶内的令牌数量上限*每个令牌可以传输的内容字节数量
mytbf_t *mytbf_init(int cps,int burst)
{
struct mytbf_st *me;

// 重点:在令牌桶初始化的时候,定时发送信号,每秒给令牌桶加令牌
if(!inited)
{
module_load(); // 给令牌桶每秒装载令牌。这个函数只有在第一次调用mytbf_init时候,才会执行到
inited = 1; // 因为alarm只能调用一个闹钟,每次调用都会重置之前的闹钟
}

// 在令牌桶数组中找到空位置
int pos = get_free_pos();
if(pos < 0)
return NULL;

// 为当前的令牌桶开辟空间
me = malloc(sizeof(*me));
if(me == NULL)
return NULL;

me->token = 0; // 当前令牌桶,初始令牌为0
me->cps = cps; // 每次可以获得的令牌数量为cps
me->burst = burst; // 令牌桶中可以积攒的令牌数量上限为burst
me->pos = pos;
job[pos] = me;

return me;

}

/*
* 从令牌桶中取得令牌,int 第二个参数是想取多少
* */
// 返回值,为真正取得了多少令牌
int mytbf_fetchtoken(mytbf_t *ptr,int size)
{
struct mytbf_st *me = ptr;
int n;

if(size < 0)
// EINVAL(Invalid Argument)是另一个常见的错误代码,表示传递给系统调用的一个或多个参数是无效的
return -EINVAL; // 这样就可以使用strerror(errno)进行报错

while(me->token <= 0)
pause(); // 等待有令牌,在继续向下执行

// 取小值
n = min(me->token,size);
me->token -= n; // 令牌数量减少

return n;
}

/*
* 将没有用完的令牌数量还到令牌桶中, int 第二个参数是想还回去的数量
* */
// 返回值,为真正还了多少令牌
int mytbf_returntoken(mytbf_t *ptr,int size)
{
struct mytbf_st *me = ptr;
int n;

if(size < 0)
// EINVAL(Invalid Argument)是另一个常见的错误代码,表示传递给系统调用的一个或多个参数是无效的
return -EINVAL; // 这样就可以使用strerror(errno)进行报错

me->token += size; // 归还了,当前令牌桶的令牌数量增加
// 但是还是要约束在上限内
if(me->token > me->burst)
me->token = me->burst;

return size;
}

/*
* 销毁令牌桶
* */
int mytbf_destroy(mytbf_t *ptr)
{
// 将传入的令牌桶销毁
struct mytbf_st *me = ptr;
// 将当前令牌桶占用的令牌桶数组位置赋值为NULL
job[me->pos] = NULL;
// 释放创建传入的令牌桶开辟的动态空间
free(ptr);
}

main.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>

#include "mytbf.h"

#define CPS 10 // 每秒传输的字节数量,一次取得令牌数量(一次积攒的令牌数量)
#define BUFSIZE 1024
#define BURST 100 // 令牌桶中积攒令牌数量上限

int main(int argc,char *argv[])
{
int sfd,dfd = 1;
int len = 0;
int ret = 0;
int pos = 0;
int size = 0;
char buf[BUFSIZE];

mytbf_t *tbf;

if(argc < 2)
{
fprintf(stderr,"Usage:%s <src_file>\n",argv[0]);
exit(1);
}

/*1.创建令牌桶*/
// CPS表示每次可以获得令牌数量,BURST表示令牌桶中可以积攒令牌数量上限
// CPS 与 BURST 均为流控的参数
tbf = mytbf_init(CPS,BURST);
if(tbf == NULL)
{
fprintf(stderr, "mytbf_init() failed!\n");
exit(1);
}
/************************************************************/


/*2.打开需要查看的文件*/
do{
sfd = open(argv[1], O_RDONLY);
if(sfd < 0)
{
// 不是假错,报错,退出
if(errno != EINTR)
{
perror("open()");
exit(1);
}
}
}while(sfd < 0);
/************************************************************/


while (1)
{
/*3.取令牌,想要取得BUFSIZE个令牌*/
// size 为实际取得的令牌数量
size = mytbf_fetchtoken(tbf, BUFSIZE);
if(size < 0)
{
fprintf(stderr, "mytbf_fetchtoken():%s\n", strerror(-size));
exit(1);
}
/************************************************************/


/*4.通过获取的令牌,从源文件中,进行数据读取*/
// 成功取得令牌,有权限读取数据,因为只有size个令牌,因此只能读取的数据为size个字节
while((len = read(sfd,buf,size)) < 0)
{
// 假错
if(errno == EINTR)
continue;
perror("read()");
exit(1);
}
if(len == 0)
break; // 读取到文件尾
/************************************************************/


/*5.令牌归还*/
// 当前取得的token没有消耗完,进行归还
// 假设说,在文件末尾最后只有三个字节的数据,但是我这次可以取得的令牌数量为10
// 因此对于这最后剩余的三个字节,只需要消耗三个令牌,还剩余7个令牌需要归还到令牌桶中
if((size - len) > 0)
mytbf_returntoken(tbf,size-len); // 归还剩余令牌
/************************************************************/


/*6.将读取的内容写到目标的文件描述符映射的文件中,即标准输出,终端上*/
pos = 0;
while(len > 0)
{
ret = write(dfd, buf + pos, len);
if(ret < 0)
{
// 假错误
if(errno == EINTR)
continue;
perror("write()");
exit(1);
}
pos += ret;
len -= ret;
}
/************************************************************/
}

// 关闭文件描述符
close(sfd);

// 销毁令牌桶
mytbf_destroy(tbf);

exit(0);
}

CMakeLists.txt

1
2
3
4
5
6
cmake_minimum_required(VERSION 3.19)
project(TokenBucket C)

set(CMAKE_C_STANDARD 99)

add_executable(TokenBucket main.c mytbf.c)
实例程序setitimer实现

mytbf.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#ifndef _MYTBF_H_
#define _MYTBF_H_

#define MYTBF_MAX 1024 // 令牌桶数组大小

typedef void mytbf_t;

/*令牌桶初始化*/
mytbf_t *mytbf_init(int cps, int burst);

/*获取令牌*/
int mytbf_fetchtoken(mytbf_t *, int);

/*归还令牌*/
int mytbf_returntoken(mytbf_t *, int);

/*销毁令牌桶*/
int mytbf_destroy(mytbf_t *);

#endif

mytbf.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <signal.h>
#include <sys/time.h>

#include "mytbf.h"

/*令牌桶类型数组,对于文件可以有MYTBF_MAX中传输速率选择*/
static struct mytbf_st* job[MYTBF_MAX];

static int inited = 0;

/*sigaction函数需要的结构体参数,用于存储程序开始前初始的信号行为*/
static struct sigaction alarm_sa_save;

struct mytbf_st
{
int cps; // 每次可以获得的令牌数量
int burst; // 桶里面可获得令牌数量上限
int token; // 桶里面当前的令牌数量
int pos; // 当前桶位于job中的索引位置
};

/*信号处理函数*/
static void alarm_action(int s, siginfo_t *infop, void *unused)
{
// 判断信号是由内核直接发送的,而不是由用户空间的程序发送
// 如果不是由内核发出的,则函数什么也不做
if(infop->si_code != SI_KERNEL)
return ;

for (int i = 0; i < MYTBF_MAX; i++)
{
if(job[i] != NULL)
{
job[i]->token += job[i]->cps;
if(job[i]->token > job[i]->burst)
job[i]->token = job[i]->burst;
}
}

}

/*模块卸载*/
static void module_unload(void)
{
struct itimerval itv;

// 对SIGALRM的行为进行恢复
sigaction(SIGALRM, &alarm_sa_save, NULL);

// 关闭定时
itv.it_interval.tv_sec = 0;
itv.it_interval.tv_usec = 0;
itv.it_value.tv_sec = 0;
itv.it_value.tv_usec = 0;
setitimer(ITIMER_REAL, &itv, NULL);

// 释放定时器空间
for (int i = 0; i < MYTBF_MAX; i++)
free(job[i]);
}

/*模块加载函数*/
static void module_load(void)
{
struct sigaction sa;
struct itimerval itv;

// 为信号SIGALRM,注册行为 alarm_action,捕获到信号SIGALRM时,会执行函数alarm_action
sa.sa_sigaction = alarm_action;
sigemptyset(&sa.sa_mask);
sa.sa_flags = SA_SIGINFO; // // 使用 SA_SIGINFO 标志可以使程序获得关于信号更详细的信息
// 信号初始行为存储到结构体 alarm_sa_save 用于程序退出时恢复
sigaction(SIGALRM, &sa, &alarm_sa_save);

itv.it_interval.tv_sec = 1;
itv.it_interval.tv_usec = 0;
itv.it_value.tv_sec = 1;
itv.it_value.tv_usec = 0;
setitimer(ITIMER_REAL, &itv, NULL);

// 钩子函数,程序结束前最后执行
atexit(module_unload);
}

static int min(int a, int b)
{
if(a < b)
return a;
return b;
}

/*在令牌桶数组中找到空位置*/
static int get_free_pos(void)
{
for(int i = 0; i < MYTBF_MAX; i++)
{
if(job[i] == NULL)
return i;
}

return -1;
}

// cps 么此可以获得令牌数量 burst 桶内的令牌数量上限
mytbf_t *mytbf_init(int cps, int burst)
{
struct mytbf_st *me;
if(!inited)
{
module_load();
inited = 1;
}



int pos = get_free_pos();
if(pos < 0)
return NULL;

me = malloc(sizeof(*me));
if(me == NULL)
return NULL;

me->token = 0;
me->cps = cps;
me->burst = burst; // 令牌桶中可以积攒的令牌数量上限为burst
me->pos = pos;
job[pos] = me;

return me;
}


/*
* 从令牌桶中取得令牌,int 第二个参数是想取多少
* */
// 返回值,为真正取得了多少令牌
int mytbf_fetchtoken(mytbf_t *ptr, int size)
{
struct mytbf_st *me = ptr;
int n;

if(size <= 0)
// EINVAL(Invalid Argument)是另一个常见的错误代码,表示传递给系统调用的一个或多个参数是无效的
return -EINVAL; // 这样就可以使用strerror(errno)进行报错

while(me->token <= 0)
pause(); // 等待有令牌,在继续向下执行

n = min(me->token, size);
me->token -= n; // 令牌数量减少

return n;
}

/*
* 将没有用完的令牌数量还到令牌桶中, int 第二个参数是想还回去的数量
* */
// 返回值,为真正还了多少令牌
int mytbf_returntoken(mytbf_t *ptr, int size)
{
struct mytbf_st *me = ptr;

if(size <= 0)
// EINVAL(Invalid Argument)是另一个常见的错误代码,表示传递给系统调用的一个或多个参数是无效的
return -EINVAL; // 这样就可以使用strerror(errno)进行报错

me->token += size; // 归还了,当前令牌桶的令牌数量增加
// 但是还是要约束在上限内
if(me->token > me->burst)
me->token = me->burst;

return size;
}


/*
* 销毁令牌桶
* */
int mytbf_destroy(mytbf_t *ptr)
{

struct mytbf_st *me = ptr;
job[me->pos] = NULL;
free(ptr);
}

main.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>

#include "mytbf.h"

#define CPS 10
#define BUFSIZE 1024
#define BURST 100


int main(int argc, char **argv)
{
int sfd, dfd = 1;
int len = 0;
int ret = 0;
int pos = 0;
int size = 0;
char buf[BUFSIZE];
mytbf_t *tbf;

if(argc < 2)
{
fprintf(stderr, "Usage:%s <src_file>\n",argv[0]);
exit(1);
}

tbf = mytbf_init(CPS, BURST);
if(tbf == NULL)
{
fprintf(stderr, "mytbf_init() failed!\n");
exit(1);
}

do
{
sfd = open(argv[1], O_RDONLY);
if(sfd < 0)
{
if(errno != EINTR)
{
perror("open()");
exit(1);
}
}
}while (sfd < 0);


while(1)
{
size = mytbf_fetchtoken(tbf, BUFSIZE);
if(size < 0)
{
fprintf(stderr, "mytbf_fetchtoken():%s\n", strerror(-size));
exit(1);
}

while((len = read(sfd, buf, size)) < 0)
{
if(errno == EINTR)
continue;
perror("read()");
break;
}

if(len == 0)
break;

if(size - len > 0)
mytbf_returntoken(tbf, size - len);

pos = 0;
while(len > 0)
{
// printf("pos=%d len=%d\n", pos, len);
ret = write(dfd, buf + pos, len);
// printf("ret=%d\n", ret);
if(ret < 0)
{
if(errno == EINTR)
continue;
perror("write()");
exit(1);
}
pos += ret;
len -= ret;
}

}

close(sfd);

mytbf_destroy(tbf);

exit(0);
}


8.setitimer定时函数

setitimer 是用于设置间隔计时器的函数。与之相关的是 getitimer,用于获取当前设置的计时器的值

该函数时间精度很高,而且误差不会累积,因此setitimeralarm函数更好

函数原型

1
2
3
#include <sys/time.h>

int setitimer(int which, const struct itimerval *new_value, struct itimerval *old_value);

参数:

  • which: 指定要设置的计时器的类型,通常可以是:
    • ITIMER_REAL: 实时计时器,当计时器超时时,将发送 SIGALRM 信号。
    • ITIMER_VIRTUAL: 虚拟计时器,仅在进程执行时减少。
    • ITIMER_PROF: 类似于 ITIMER_VIRTUAL,但还包括了当系统执行 behalf 时的时间。
  • new_value: 指定新计时器的间隔和值。
  • old_value: 如果不为NULL,那么在调用之前计时器的当前值将存储在此处

itimerval 结构体的定义大致如下:

1
2
3
4
struct itimerval {
struct timeval it_interval; // 计时器重启时的新值 next value
struct timeval it_value; // 计时器的当前值 current value
};

it_value递减为0时,发信号。并且it_interval会原子化赋值给it_value,重新开始计时

其中,timeval 结构如下:

1
2
3
4
struct timeval {
long tv_sec; // 秒
long tv_usec; // 微秒
};

返回值

  • 0成功,-1表示失败

使用 setitimer 时,还需要处理可能由计时器超时产生的信号,如 SIGALRM

注意事项

  • 定时器类型限制
    • 每种类型的定时器(ITIMER_REALITIMER_VIRTUALITIMER_PROF)在任何时刻都只能有一个活动实例。因此,对于每种类型的定时器,后续的 setitimer 调用会重置该类型的定时器设置
    • 如果你调用 setitimer 设置了一个 ITIMER_REAL 定时器,然后再次调用 setitimer 设置同类型的定时器,第二个调用会覆盖第一个调用的设置

实例

mycat2.c漏桶实现的alarm函数替换为setitimer函数,alarm只能设置一个一次性的定时器,即一旦定时器到期,就必须重新设置。而setitimer 允许设置周期性的定时器。通过设定 it_interval 结构的值,setitimer 可以在定时器到期后自动重置,无需手动再次调用。这对于需要定期执行任务的应用程序非常方便,如定期检查、更新状态或执行维护任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
/*
* 控制每秒传输的字符数量为10
* 将alarm函数替换为setitimer
* */
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <signal.h>
#include <sys/time.h>

#define CPS 10 // 每秒传输的字符数量为10
#define BUFSIZE CPS

static volatile int loop = 0; // volatile 变为寄存器变量

// 信号处理函数
static void alarm_handler(int s)
{
// alarm(1); // 在处理函数中也进行定时,alarm只能设置一个一次性定时器
// 因为setitimer函数允许周期性定时
loop = 1;
}

int main(int argc,char *argv[])
{
// 创建源文件以及目标文件的文件描述符
int sfd,dfd = 1;
// 读写缓冲区
char buf[BUFSIZE];
int len = 0;
int ret = 0;
int pos = 0;

if(argc < 2)
{
fprintf(stderr,"Usage...\n");
exit(1);
}

// 捕捉信号,执行alarm_handler函数
signal(SIGALRM,alarm_handler);
// alarm(1); // 通过alarm函数进行定时,当进程运行1s,则会向进程发送信号SIGALRM
// 使用setitimer函数替换alarm
// 参数1: ITIMER_REAL 实时计时器,以实际时间递减,当定时器到期时会发送 SIGALRM 信号
struct itimerval itv;
itv.it_interval.tv_sec = 1; // 下次定时器重新启动的间隔时间
itv.it_interval.tv_usec = 0;
itv.it_value.tv_sec = 1; // 定时器到期的时间
itv.it_value.tv_usec = 0;
// itv结构体两个字段分别为当前的定时时间,以及下次的定时时间都设置为1s,定时器就可以每间隔1s给进程发送SIGALRM信号
if(setitimer(ITIMER_REAL,&itv,NULL) < 0)
{
perror("setitimer()");
exit(1);
}


// 打开文件返回文件描述符
// 源文件,以读的形式打开,源文件必须存在
sfd = open(argv[1],O_RDONLY);
do {
if(sfd<0)
{
// 不是假错
if(errno != EINTR)
{
perror("open()");
exit(1);
}
}
}while(sfd < 0);


while(1)
{

while(!loop)
pause(); // 阻塞的系统调用,进程暂停,等待信号到来
loop = 0;

// 通过read,在源文件文件描述符sfd映射的文件中,读取数据,存储到缓冲区buf中,一次最大要读取的数据长度为BUFSIZE
// read,返回为实际读取的内容字节数量,内容长度
while((len = read(sfd,buf,BUFSIZE)) < 0)
{
// 假错,重新读取
if(errno == EINTR)
continue;
// 读取失败
perror("read()");
exit(1);
}
if(len == 0)
break; // 读取到文件末尾,跳出循环


pos = 0;
// 当从源文件中读取的内容没有全部写入到目标文件时,就多次在缓冲区上次写到的地方写入
while (len > 0)
{
// 开始向目标文件写入数据
// 之后每次写入都从buf数组中的buf+pos位置写入
// ret 为 write函数 实际向目标文件中写入的字节数
ret = write(dfd,buf+pos,len);
if(ret < 0)
{
// 假错误
if(errno == EINTR)
continue;
perror("write()");
exit(1);
}
pos += ret;
len -= ret;
}
}

close(sfd);
exit(0);
}


9.多任务计时器anytime实现

使用单一计时器,构造一组函数,实现任意数量的计时器

使用alarm函数,进行定时,会存在一个问题,如果多次调用alarm函数,在别的alarm调用还没有信号到来的期间,继续调用alarm函数,那么之前的定时会进行重置,以最后这次调用的定时间为准,如下,三次调用在第一次调用,10s计时还没有结束,就调用alarm(5),那么定时重置,这时5s定时开始,10s失效,而在5s计时还没有结束,就调用alarm(2),那么定时重置,这时2s定时开始,5s失效

1
2
3
alarm(10);
alarm(5);
alarm(2);

这一节的任务是:通过alarm函数或者setitimer函数,进行如下伪码计时,那么程序,在程序开始之后的2s时,发送信号,进程执行函数f2;在程序开始之后的5s时,发送信号,进程执行函数f1;最后,在程序开始之后的7s时,发送信号,进程执行函数f1;

1
2
3
定时器15,f1,aaa
定时器22,f2,bbb
定时器37,f1,ccc

思路:构建如下三个任务结构体,结构体字段有sec表示任务计时,func表示任务处理函数,arg表示任务参数,执行过程如下图:

image-20240508103746808

Anytimer1.c

Anytimer1.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#ifndef ANYTIMER_ANYTIMER_H
#define ANYTIMER_ANYTIMER_H

#define JOB_MAX 1024 // 最大的定时任务数量

// 如此,任何以 at_jobfunc_t 类型声明的函数,均是返回值类型为void,接受传入参数类型为void *的函数
typedef void at_jobfunc_t(void *);

/*设置可重复使用的闹钟数组*/
int at_addjob(int sec, at_jobfunc_t *jobp, void *arg);
/*
* return >= 0 成功,返回任务ID
* == -EINVAL 失败,参数非法
* == -ENOSPC 失败,数组满
* == -ENOMEM 失败,内存空间不足
* */


/*关闭闹钟*/
int at_canceljob(int id);
/*
* return == 0 成功,指定任务已取消
* == -EINVAL 失败,参数非法
* == -EBUSY 失败,指定任务已完成
* == -ECANCELED 失败,指定任务重复取消
* */


/*任务资源回收,类似于进程的wait函数,收尸*/
int at_wait(int id);
/*
* return == 0 成功任务成功释放
* == -EINVAL 失败,参数非法
* */

/*添加周期的定时任务*/
int at_addjob_repeat(int sec, at_jobfunc_t *jobp, void *arg);

/******************其他可实现的功能函数******************/
// 暂停一个任务
// int at_pausejob();

// 重新启动一个任务
// int at_resumejob();

#endif //ANYTIMER_ANYTIMER_H

Anytimer.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
#include <signal.h>
#include <sys/time.h>
#include <errno.h>
#include <wait.h>

#include "anytimer.h"

/*枚举表示定时器的几种状态*/
enum
{
STATE_RUNNING = 1,
STATE_CANCELED,
STATE_OVER
};

/*定时器信息结构体*/
struct at_job_st{
int job_state;
int sec;
int time_remain; // 任务定时剩余的时间,倒计时

int repeat; // 是否周期任务
at_jobfunc_t *jobp; // 定时器时间到发送信号,指向的任务函数
void *arg; // 任务函数的参数
};

/*定时器数组,总共可以创建的定时器数量为 JOB_MAX*/
static struct at_job_st *job[JOB_MAX];
static int inited = 0;
static struct sigaction alrm_sa_save; // 用于存储,进程中信号的初始行为信息,用于程序退出时的行为还原

/*定时器信号行为函数*/
static void alrm_action(int s,siginfo_t *infop,void *unused)
{
// 判断信号是由内核直接发送的,而不是由用户空间的程序发送
// 如果不是由内核发出的,则函数什么也不做
if(infop->si_code != SI_KERNEL)
return;

// 遍历定时器数组,处理定时器任务
for(int i=0;i<JOB_MAX;i++)
{
if(job[i]!=NULL && job[i]->job_state == STATE_RUNNING)
{
job[i]->time_remain--; // 倒计时--
// 当倒计时为0,才开始执行主程序中传入定时器的任务
if(job[i]->time_remain == 0)
{
// 执行定时器任务
job[i]->jobp(job[i]->arg);
// 查看任务是否重复执行
if(job[i]->repeat == 1)
job[i]->time_remain = job[i]->sec;
else
// 不重复执行任务,则定时器状态为结束
job[i]->job_state = STATE_OVER;
}
}
}
}

/*定时器数组中找位置*/
static int get_free_pos(void)
{
for(int i = 0;i<JOB_MAX;i++)
{
if(job[i] ==NULL)
return i;
}
return -1;
}


static void module_unload(void)
{
// 关闭定时器
struct itimerval itv;
itv.it_interval.tv_sec = 0;
itv.it_interval.tv_usec = 0;
itv.it_value.tv_sec = 0;
itv.it_value.tv_usec = 0;
if(setitimer(ITIMER_REAL,&itv,NULL) < 0)
{
perror("setitimer()");
exit(1);
}

// 信号行为还原
if(sigaction(SIGALRM,&alrm_sa_save,NULL) < 0)
{
perror("sigaction()");
exit(1);
}
}


/*模块加载,开启定时器*/
static void module_load(void)
{
struct sigaction sa;
struct itimerval itv;

sa.sa_sigaction = alrm_action; // 设置信号的行为函数
sigemptyset(&sa.sa_mask); // 将阻塞信号集设置为空
sa.sa_flags = SA_SIGINFO; // 使用 SA_SIGINFO 标志可以使程序获得关于信号更详细的信息
// 将初始行为存储到 alrm_sa_save 用于还原
if(sigaction(SIGALRM,&sa,&alrm_sa_save) < 0)
{
perror("aigaction()");
exit(1);
}

// 设置itv字段信息
// 1秒后触发,之后每1秒触发一次
itv.it_interval.tv_sec = 1; // 这样设置就可以周期性的定时
itv.it_interval.tv_usec = 0;
itv.it_value.tv_sec = 1;
itv.it_value.tv_usec = 0;
if(setitimer(ITIMER_REAL,&itv,NULL) < 0)
{
perror("setitimer()");
exit(1);
}

// 钩子函数,程序退出时,模块卸载
atexit(module_unload);
}


/*添加非周期的定时任务*/
/*
* return >= 0 成功,返回任务ID
* == -EINVAL 失败,参数非法
* == -ENOSPC 失败,数组满
* == -ENOMEM 失败,内存空间不足
*
* 参数: sec 定时时间
* jobp 信号行为函数
* arg 行为函数参数
* */
int at_addjob(int sec, at_jobfunc_t *jobp, void *arg)
{
int pos;
struct at_job_st *me;

// 返回无效参数
if(sec < 0)
return -EINVAL;

if(!inited)
{
module_load(); // 加载模块
inited = 1;
}

// 在定时器数组中找位置
pos = get_free_pos();
if(pos < 0)
{
// 返回数组满
return -ENOSPC;
}

// 动态开辟定时器空间
me = malloc(sizeof(*me));
if(me == NULL)
return -ENOMEM;

me->job_state = STATE_RUNNING; // 定时器状态初始化为运行态
me->sec = sec; // 初始化定时器定时时间
me->time_remain = me->sec;
me->jobp = jobp; // 定时器定时结束发送信号,的信号处理行为函数
me->arg = arg;
me->repeat = 0; // 是否周期定时,初始为否

// 占用定时器数组位置
job[pos] = me;

return pos;
}


/*关闭闹钟*/
/*
* return == 0 成功,指定任务已取消
* == -EINVAL 失败,参数非法
* == -EBUSY 失败,指定任务已完成
* == -ECANCELED 失败,指定任务重复取消
* */
int at_canceljob(int id)
{
if(id < 0 || id >= JOB_MAX || job[id] == NULL)
// 无效参数
return -EINVAL;
if(job[id]->job_state == STATE_CANCELED)
// 指定任务重复取消
return -ECANCELED;
if(job[id]->job_state == STATE_OVER)
// 取消的任务已完成
return -EBUSY;

job[id]->job_state = STATE_CANCELED;

return 0;
}

/*添加周期的定时任务*/
int at_addjob_repeat(int sec, at_jobfunc_t *jobp, void *arg)
{
int pos;
struct at_job_st *me;

if(sec < 0)
return -EINVAL;

if (!inited)
{
module_load(); // 模块加载
inited = 1;
}

// 需要位置
pos = get_free_pos();
if (pos < 0)
return -ENOSPC;


me = malloc(sizeof(*me));
if (me == NULL)
// 数组满
return -ENOMEM;

me->job_state = STATE_RUNNING; // 定时器状态初始化为运行态
me->sec = sec; // 初始化定时器定时时间
me->time_remain = me->sec;
me->jobp = jobp; // 定时器定时结束发送信号,的信号处理行为函数
me->arg = arg;
me->repeat = 1; // 是否周期定时,初始为是

// 占用定时器数组位置
job[pos] = me;

return pos;
}


/*任务资源回收,类似于进程的wait函数,收尸*/
/*
* return == 0 成功任务成功释放
* == -EINVAL 失败,参数非法
* */
int at_waitjob(int id)
{
if(id < 0 || id >= JOB_MAX || job[id] == NULL)
// 无效参数
return -EINVAL;

if(job[id]->repeat == 1)
// 当前任务还在周期执行
return -EBUSY;

// 若当前定时器任务还是处于运行态度
while (job[id]->job_state == STATE_RUNNING)
// pause 函数使调用它的程序挂起,直到捕捉到一个信号并且相应的信号处理函数返回
// 若定时任务仍处于运行态,则进程等待定时结束的信号近来
pause();

// 当定时器任务已经结束就可以对该定时器任务的资源进行回收
if(job[id]->job_state == STATE_CANCELED || job[id]->job_state == STATE_OVER)
{
free(job[id]);
job[id] = NULL;
}

return 0;
}

main.c

jobalarm函数中,添加了三个定时任务,当运行job1,job2,job3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <wait.h>
#include <sys/types.h>
#include <unistd.h>

#include "anytimer.h"

static void f1(void *s)
{
printf("f1():%s\n", (char *)s);
fflush(NULL);
}

static void f2(void *s)
{
printf("f2():%s\n", (char *)s);
fflush(NULL);
}

/*周期定时任务*/
static void repeat(void)
{
int job1;

puts("Begin!");
// 每2s调用一次
job1 = at_addjob_repeat(2, f1, "aaa");
if(job1 < 0)
{
fprintf(stderr, "at_addjob():%s\n", strerror(-job1));
exit(1);
}
while(1)
{
write(1, ".", 1);
sleep(1);
}
}

/*非周期定时任务*/
static void jobalarm(void)
{
int job1, job2, job3;

puts("Begin!");
job1 = at_addjob(5, f1, "aaa");
if(job1 < 0)
{
fprintf(stderr, "at_addjob():%s\n", strerror(-job1));
exit(1);
}

job2 = at_addjob(2, f2, "bbb");
if(job2 < 0)
{
fprintf(stderr, "at_addjob():%s\n", strerror(-job2));
exit(1);
}
job3 = at_addjob(7, f1, "ccc");
if(job3 < 0)
{
fprintf(stderr, "at_addjob():%s\n", strerror(-job3));
exit(1);
}

while(1)
{
write(1, ".", 1);
sleep(1);
}
}

int main(void)
{
// repeat();
jobalarm();

return 0;
}

CMakeLists.txt

1
2
3
4
5
6
cmake_minimum_required(VERSION 3.19)
project(AnyTimer C)

set(CMAKE_C_STANDARD 99)

add_executable(AnyTimer main.c anytimer.h anytimer.c)

编译运行

image-20240508193146420

Anytimer2.c 简版

anytimer.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#ifndef ANYTIMER2_ANYTIMER_H
#define ANYTIMER2_ANYTIMER_H

#define BUFSIZE 1024
typedef void anytm_func(void *);

/*设置可以重复使用的闹钟数组*/
int anytimer_alarm(int,anytm_func *,const void *);

/*关闭闹钟*/
void anytimer_close(int n);

/*关闭全部闹钟*/
void anytimer_destroy(void);

#endif //ANYTIMER2_ANYTIMER_H

anytimer.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
#include <signal.h>
#include <sys/time.h>
#include <errno.h>
#include <wait.h>

#include "anytimer.h"

// 闹钟系统初始标志位
static int init_mod;
struct sigaction oldact; // 用于存储程序初始的信号行为

/*定时器结构体*/
struct alarm_st
{
int token; // 令牌
anytm_func *anyfunc; // 闹钟任务函数
void *str; // 参数
};

/*定时器结构体数组*/
static struct alarm_st *alarm_arr[BUFSIZE];


/*关闭闹钟系统*/
/*
* 1.将闹钟关闭
* 2.信号行为进行初始还原
* */
static void __arr_destroy(void)
{
// 信号行为进行初始还原
sigaction(SIGALRM,&oldact,NULL);
// 关闭定时器
struct itimerval itv;
itv.it_interval.tv_sec = 0;
itv.it_interval.tv_usec = 0;
itv.it_value.tv_sec = 0;
itv.it_value.tv_usec = 0;
setitimer(ITIMER_REAL,&itv,NULL);
}


/*判断定时器结构体数组中,是否有存在的定时器*/
static void __ifarr_NULL(void)
{
int i;
for(i = 0;i<BUFSIZE && alarm_arr[i] == NULL;i++);
if(i == BUFSIZE)
anytimer_destroy(); // 若不存在运行的定时器则将系统关闭
}


/*定时器信号行为函数*/
static void alarm_handler(int s)
{
int i;
for(i = 0;i<BUFSIZE;i++)
{
if(alarm_arr[i] != NULL)
{
// 令牌数量-1(或者倒计时-1)
alarm_arr[i]->token--;
if(alarm_arr[i]->token <= 0)
{
// 执行定时器任务
alarm_arr[i]->anyfunc(alarm_arr[i]->str);
// 任务执行结束,释放该定时器动态空间
free(alarm_arr[i]);
// 定时器结构体数组标记为NULL,该位置没有被占用
alarm_arr[i] = NULL;
}
}
}
__ifarr_NULL();
}


/*定时器系统初始化*/
static void __alarm_init(void)
{
struct itimerval newtime;
// 定时器信号,程序开始后1s出发,之后每隔1s发送一次
newtime.it_interval.tv_sec = 1;
newtime.it_interval.tv_usec = 0;
newtime.it_value.tv_sec = 1;
newtime.it_value.tv_usec = 0;

struct sigaction act;
act.sa_handler = alarm_handler; // SIGALRM 信号行为函数
sigemptyset(&act.sa_mask); // 将阻塞信号集设置为空
act.sa_flags = 0; // 不需要其他设置

// 设置定时器
setitimer(ITIMER_REAL,&newtime,NULL);
// 定义信号行为
sigaction(SIGALRM,&act,&oldact);
init_mod = 1;
}


/*添加定时器任务*/
/* 参数:
* int sec : 定时时长
* anytm_func *anytm : 定时器任务函数
* const void *p : 任务函数参数
* 返回值:
* >0 定时器ID
* <0 ERROR
* */
int anytimer_alarm(int sec,anytm_func *anytm,const void *p)
{
if(0 == init_mod)
__alarm_init(); // 定时器系统仅仅初始化一次

// 查找定时器结构体数组的空位置
int i;
for(i=0;i<BUFSIZE;i++)
if(alarm_arr[i] == NULL)
break;
if(i>=BUFSIZE)
// 没有空位置
return -1;

struct alarm_st *me = NULL;
me = malloc(sizeof(*me));
if(me == NULL)
{
perror("malloc()");
return -1;
}

me->token = sec;
me->anyfunc = anytm;
me->str = (void*)p;
alarm_arr[i] = me;
return i;
}


/*定时器关闭*/
// 传入的定时器ID
void anytimer_close(int n)
{
// 清除相应定时器,没有考虑原子性
if(alarm_arr[n] != NULL)
{
free(alarm_arr[n]);
alarm_arr[n] = NULL;
}
__ifarr_NULL();
}


/*所有定时器都关闭*/
void anytimer_destroy(void)
{
__arr_destroy();
for(int i = 0; i < 1024; i++) {
if(alarm_arr[i] != NULL) {
free(alarm_arr[i]);
alarm_arr[i] = NULL;
}
}
}

main.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#include <stdio.h>
#include <stdlib.h>
#include <wait.h>
#include <sys/types.h>
#include <unistd.h>

#include "anytimer.h"

static void any1(void *s)
{
printf("%s", (char *)s);
fflush(NULL);
}

int main() {
int i = 0;
int alarm_arr_id;
// 创建多个定时任务
anytimer_alarm(2, any1, "hello2");
anytimer_alarm(4, any1, "hello4");
alarm_arr_id = anytimer_alarm(6, any1, "hello6");
anytimer_alarm(8, any1, "hello8");
anytimer_alarm(10, any1, "hello10");
anytimer_alarm(12, any1, "hello12");

while(1)
{
write(1, ".", 1);
sleep(1);
i++;
//关闭第三次alarm,不执行hello6输出
if(i == 5)
anytimer_close(alarm_arr_id);
//第四次alarm后,全关闭alarm数组
if(i == 9)
anytimer_destroy();
}
return 0;
}

CMakeLists.txt

1
2
3
4
5
6
cmake_minimum_required(VERSION 3.19)
project(Anytimer2 C)

set(CMAKE_C_STANDARD 99)

add_executable(Anytimer2 main.c anytimer.h anytimer.c)

编译运行如下:

image-20240508204002685



10.信号集

信号是一种中断机制,用于通知进程某些特定事件的发生。信号集是一组信号的集合,用于表示多个信号

在C语言中,您可以使用以下库函数和数据结构来处理信号集:

  • sigset_t:这是一个数据结构,用于表示信号集。
  • sigemptyset(sigset_t *set):初始化信号集为空集。
  • sigfillset(sigset_t *set):将所有信号添加到信号集。
  • sigaddset(sigset_t *set, int signum):将指定信号添加到信号集。
  • sigdelset(sigset_t *set, int signum):从信号集中删除指定信号。
  • sigismember(const sigset_t *set, int signum):检查指定信号是否在信号集中
(1)sigprocmask函数

该函数无法决定信号什么时候来,但是可以决定信号何时被响应

sigprocmask函数用于检查或更改进程的信号屏蔽字。这可以用于阻塞或解阻某些信号,使得在某些代码段执行时,这些信号不会被传递到进程

函数原型

1
int sigprocmask(int how, const sigset_t *set, sigset_t *oldset);

参数:

  • how:这决定了如何更改当前的信号屏蔽字。它可以是以下之一:
    • SIG_BLOCK:将set中的信号添加到当前的信号屏蔽字中。
    • SIG_UNBLOCK:从当前的信号屏蔽字中删除set中的信号。
    • SIG_SETMASK:将当前信号屏蔽字设置为 set 指定的值。
  • set:是一个指向信号集的指针,该信号集指定了要进行上述操作的信号。如果set为NULL,则how的值不重要,信号屏蔽字不会被更改。
  • oldsetoldset 参数是一个指向 sigset_t 类型的变量的指针,用于存储修改前的信号屏蔽字。如果不需要这个信息,可以将其设置为 NULL

返回值:

  • 如果成功,则返回0
  • 如果出错,则返回-1,并设置errno以指示错误。

程序实例1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <unistd.h>



int main()
{
int i,j;

// 创建一个信号的集合
sigset_t set,oset,saveset;


// 将指定信号添加到信号集中
sigaddset(&set,SIGINT);

// 对set中的信号进行解除阻塞(这是由于该程序的目的是为了先阻塞后解除阻塞,需要保证在进入循环之前信号时解除阻塞的),saveset用于存储修改前的信号屏蔽字
sigprocmask(SIG_UNBLOCK,&set,&saveset);

for(j = 0;j<1000;j++)
{
/// 将set中的信号添加到当前的屏蔽字中,阻塞set中的信号,这些信号不会被传递到进程,oset用于存储修改前的信号屏蔽字
sigprocmask(SIG_BLOCK,&set,&oset);
for(i = 0;i<5;i++)
{
// 将文件描述符1号位置 每秒输入一个字符 * 即标准输出
write(1,"*",1);
sleep(1);
}
write(1,"\n",1);
// 设置当前屏蔽字为oset这样,就会解除之前对于set中信号的阻塞
// 按照程序的逻辑在换行的时候会对SIGINT作出响应,当对终端键入ctrl + c时,会打断终端运行
sigprocmask(SIG_SETMASK,&oset,NULL);
}

// 对set中的信号状态进行恢复
sigprocmask(SIG_SETMASK,&saveset,NULL);

exit(0);
}

该程序运行之后,在终端时入ctrl+c不会立刻进行响应信号,在打印*的过程中打印!号,而是会在换行之后作出响应,因为这个时候解除了对于信号的阻塞

注意编写信号有关的程序,需要保证信号在全局的状态是不变的,即运行程序后,信号的状态不变,因此在使用函数对某信号进行阻塞时,应该先对其状态进行保存,在使用运行程序结束后,对信号的状态进行恢复

运行结果

1
2
3
4
5
(base) zxz@zxz-B660M-GAMING-X-AX-DDR4:~/Proj/CLionProj/UNIX/IO/signal$ ./7
**^C*^C**
!***^C**
!*****

程序实例2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <unistd.h>

static void int_handler(int s)
{
// // 将文件描述符1号位置 输入一个字符 ! 即标准输出
write(1,"!",1);
}

int main()
{
int i,j;

// 创建一个信号的集合
sigset_t set,oset,saveset;

// 捕获到SIGINT信号时,则使用回调函数int_handler
// 这样就会对该信号的主要作用进行忽略
signal(SIGINT,int_handler);

// 将指定信号添加到信号集中
// 在终端键入 ctrl +c ---> SIGINT
// 在终端键入 ctrl +\---> SIGQUIT
sigaddset(&set,SIGINT | SIGQUIT);

// 对set中的信号进行解除阻塞(这是由于该程序的目的是为了先阻塞后解除阻塞,需要保证在进入循环之前信号时解除阻塞的),将set中信号的状态存储到saveset中,便于程序运行结束之后的恢复
sigprocmask(SIG_UNBLOCK,&set,&saveset);

for(j = 0;j<1000;j++)
{
/// 将set中的信号添加到当前的屏蔽字中,阻塞set中的信号,这些信号不会被传递到进程,并将set中信号状态保存到oset中
sigprocmask(SIG_BLOCK,&set,&oset);
for(i = 0;i<5;i++)
{
// 将文件描述符1号位置 每秒输入一个字符 * 即标准输出
write(1,"*",1);
sleep(1);
}
write(1,"\n",1);
// 设置当前屏蔽字为oset这样,就会解除之前对于set中信号的阻塞
// 按照程序的逻辑在新起一行,开始进行打印的时候键入 ctrl+\ 均会使得程序终止运行
// 使用 sigprocmask 函数恢复之前保存的信号屏蔽字 oset,这样在下一次外层循环开始之前,SIGINT 信号将不再被屏蔽
sigprocmask(SIG_SETMASK,&oset,NULL);
}

// 对set中的信号状态进行恢复
// 在外层循环结束后,使用 sigprocmask 函数恢复程序开始时保存的原始信号屏蔽字 saveset
sigprocmask(SIG_SETMASK,&saveset,NULL);

exit(0);
}


11.信号屏蔽字/Pending集的处理

视频教程



12.扩展

(1)sigsuspend()

sigsuspendUNIX和类UNIX系统中C语言的一个系统调用函数,用于暂时替换进程的信号屏蔽字并暂停进程执行(sigsuspend() 会使进程进入阻塞等待状态,直到接收到一个未被屏蔽的信号),直到接收到一个信号。一旦进程接收到一个信号,sigsuspend将返回,并恢复进程的原始信号屏蔽字

这个函数在某些场景中非常有用,特别是当进程希望等待一个信号但不希望被其他信号中断时

函数原型

1
int sigsuspend(const sigset_t *mask);

参数:

  • mask:指向要设置的新信号屏蔽字的信号集。

返回值:

  • sigsuspend总是返回-1,并设置errnoEINTR,表示由信号中断

该函数的工作流程如下:

  • 替换信号屏蔽集sigsuspend()mask 替换进程的当前信号屏蔽集。信号屏蔽集是一组信号,它们在进程运行期间被屏蔽,不会立即传递给进程
  • 等待信号:函数使进程进入等待状态,直到接收到一个未屏蔽的信号。该信号将被传递给进程,并触发相应的信号处理程序或默认行为
  • 恢复信号屏蔽集:在信号处理程序返回或默认行为执行完毕后,sigsuspend() 恢复进程的原始信号屏蔽集
  • 返回sigsuspend() 函数返回 -1,并设置 errno 表示错误类型。通常,它在接收到一个未屏蔽的信号后返回

(2)sigaction()

sigaction() 是一个在 Unix-like 系统中用于检查或修改信号行为的函数。它被设计用来取代早期的 signal() 函数,因为它提供了更多的功能和更明确的语义。

信号是 UNIX 系统中进程间通讯的方式之一。当进程接收到一个信号时,它可以选择忽略该信号、捕获该信号并执行相应的处理程序,或者按照默认方式处理该信号(例如,SIGTERM 信号的默认处理方式是终止进程)

函数原型

1
2
3
#include <signal.h>

int sigaction(int signum, const struct sigaction *act, struct sigaction *oldact);

参数:

  • signum: 需要检查或修改行为的信号。
  • act: 如果不为 NULL,则它指向一个定义新行为的 sigaction 结构体。
  • oldact: 如果不为 NULL,则在调用之后,这个结构体会被设置为信号的旧行为

sigaction 结构体的定义可能如下:

1
2
3
4
5
6
7
struct sigaction {
void (*sa_handler)(int);
void (*sa_sigaction)(int, siginfo_t *, void *);
sigset_t sa_mask;
int sa_flags;
void (*sa_restorer)(void);
};

其中:

  • sa_handler: 对于信号的反应(例如:SIG_IGN, SIG_DFL, 或者一个函数指针)。
  • sa_sigaction: 一个替代 sa_handler 的处理函数,但它提供更多信息。
  • sa_mask: 在处理该信号时,额外需要被阻塞的信号集。
  • sa_flags: 修改行为的标志。
  • sa_restorer: 以前用于内部恢复操作,现在已经不再使用

一些常见的 sa_flags 值:

  • SA_SIGINFO: 使用 sa_sigaction 而不是 sa_handler
  • SA_RESTART: 使某些被信号中断的系统调用可重启。
  • SA_NOCLDSTOP: 如果信号是 SIGCHLD,则只有当子进程退出时才会收到,而不是当它停止时

程序实例:使用 sigaction 来设置一个处理 SIGINT 信号(通常是 Ctrl+C 产生的)的处理函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include <stdio.h>
#include <signal.h>
#include <unistd.h>

void handle_sigint(int sig) {
printf("\nCaught signal %d\n", sig);
}

int main() {
struct sigaction sa;
sa.sa_handler = handle_sigint;
sa.sa_flags = SA_RESTART;
sigaction(SIGINT, &sa, NULL);

while(1) {
printf("Running...\n");
sleep(1);
}

return 0;
}

这段代码定义了一个处理函数 handle_sigint 来捕获 SIGINT 信号,并使用 sigaction 函数将其设为 SIGINT 信号的处理程序




二、线程

1.线程的概念

线程通俗理解为一个正在运行的函数

在之前的进程就是容器用于承载多个线程

线程,在计算机科学中通常指的是线程的执行单元。在操作系统中,一个程序可以有多个线程,每个线程可以并行(或伪并行在单核CPU上)执行。线程比进程更轻量级,多个线程共享同一个地址空间和资源(线程通信相对于进程简单),但每个线程有其自己的指令指针、堆栈

一个进程中至少有一个线程

线程的优点

  • 效率:线程的创建和销毁比进程要快得多,切换线程的成本也比切换进程要小。
  • 资源共享:因为线程之间共享相同的地址空间,所以一个线程可以访问另一个线程的数据。这使得数据交换变得容易,但也意味着需要同步机制来避免资源竞争和数据不一致。
  • 响应速度:在多线程程序中,即使一个线程被阻塞(例如等待I/O操作),其他线程仍然可以继续执行。
  • 利用多核:现代的多核处理器可以同时执行多个线程,这使得多线程程序可以真正地并行执行,从而提高了性能

线程缺点

  • 同步和死锁:当多个线程试图访问共享资源时,可能会发生资源竞争,这需要使用同步机制(如互斥锁、信号量等)来解决。但不恰当的使用同步机制可能导致死锁。
  • 数据不一致:多个线程并发修改共享数据可能导致数据不一致。
  • 调试困难:多线程程序的行为可能是不确定的,因此调试可能比单线程程序更加困难。
  • 设计复杂性:编写和维护多线程代码需要更高的专业知识和经验

在ubuntu可以使用命令:ps axm 查看线程

1
$ ps axm
(1)线程标识符
1
pthread_t thread_id;

(2)pthread_equal()

pthread_equal是一个用于比较两个线程ID是否相同的函数

函数原型

1
int pthread_equal(pthread_t t1, pthread_t t2);

参数:

  • t1t2:要比较的线程ID

返回值:

  • 如果两个线程ID相同,则返回非零值。
  • 如果两个线程ID不同,则返回0

(3)pthread_self()

pthread_selfPOSIX 线程库中的一个函数,用于获取调用线程的线程ID。这个函数在你需要在某个线程内部知道它自己的线程ID时特别有用

函数原型

1
pthread_t pthread_self(void);

返回值:

  • 调用线程的线程ID

(4)sched_yield()

sched_yield() 函数是一个用于控制线程调度的重要函数。该函数的作用是让出调用线程的当前CPU时间片,给其他线程或进程运行的机会。这可以帮助实现更加公平的线程调度,特别是在多线程环境中,当线程需要等待某些资源释放或条件满足时,主动让出CPU,从而提高整体系统的效率和响应性。

1
2
3
#include <sched.h>

int sched_yield(void);

返回值:

  • sched_yield() 总是返回 0

行为用途:

当线程调用 sched_yield() 后,它会告诉操作系统的调度器,当前线程愿意放弃其剩余的时间片,使调度器可以选择另一个线程或进程来执行。这通常在以下情况下使用:

  • 改善响应性:在计算密集型任务中,线程可以定期调用 sched_yield(),以避免独占CPU资源过长时间,从而允许其他同等优先级的线程有机会运行,提高系统的响应性
  • 等待资源:当线程需要等待一个条件变量、锁或其他资源时,而这些资源当前不可用,线程可以调用 sched_yield()减少忙等(busy-waiting)带来的CPU资源浪费
  • 协同运行:在某些协作型多线程应用程序中,线程间需要频繁交换数据或状态信息。在这些情况下,通过 sched_yield() 可以帮助实现更平滑的协同执行

(5)线程状态与CPU占用

在多线程编程中,线程可以处于不同的状态:

  • 运行(Running):线程正在 CPU 上执行。
  • 就绪(Runnable):线程已准备好运行,正在等待 CPU 时间。
  • 阻塞(Blocked):线程因等待某种资源(如 I/O 操作完成、获取锁、等待其他事件如条件变量)而暂停执行。
  • 休眠(Sleeping):线程主动放弃了 CPU,通常是调用了 sleep() 或类似函数。
  • 终止(Terminated):线程已完成执行或被终止。

当线程处于阻塞或休眠状态时,它不会占用 CPU 时间。操作系统调度器会将 CPU 分配给其他处于就绪状态的线程。这样的调度策略使得 CPU 资源可以有效地在多个线程之间共享,提高了系统的整体效率和响应性。

线程挂起的影响:

  • 资源使用:挂起的线程不占用 CPU 周期,但仍然会占用一定的内存资源,因为线程的上下文(如栈、寄存器状态、线程局部存储)需要保留,以便线程可以在后续被唤醒并继续执行
  • 系统性能:合理的线程挂起可以帮助系统更好地管理资源,避免无谓的 CPU 占用,尤其是在线程需要等待外部事件或数据时。通过让线程挂起,系统可以将有限的 CPU 资源分配给其他可能正在进行计算的线程,从而优化整体性能


2.线程创建

(1)pthread_create()

pthread_createPOSIX线程库中的一个函数,用于创建一个新的线程。这个函数允许开发者定义一个函数,这个函数将会在新创建的线程中运行

函数原型

1
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);

参数:

  • thread:指向 pthread_t 类型的指针,用于存储新创建线程的线程ID。
  • attr:指向 pthread_attr_t 类型的指针,用于设置线程属性(如堆栈大小)。如果传递 NULL,则会使用默认属性(一般默认就可以解决80%的问题)。
  • start_routine:指针,指向新线程启动时需要调用的函数。这个函数的返回类型必须为 void *,并且接受一个 void * 参数。
  • arg:传递给 start_routine 的参数

返回值:

  • 如果线程成功创建,返回0
  • 如果发生错误,返回一个非0的错误代码(error number

程序实例

create1.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <pthread.h>
#include <string.h>


// 指针函数 return NULL 指针
static void *func(void *p)
{
puts("Thread is working!");
return NULL;
}



int main()
{
int err;
pthread_t tid;

puts("Begin!");

err = pthread_create(&tid,NULL,func,NULL);

// 0表示成功 非0则失败
if(err)
{
fprintf(stderr,"pthread_create():%s\n", strerror(err));
exit(1);
}

puts("End!");

exit(0);
}

CMakeLists.txt

1
2
3
4
5
6
7
8
cmake_minimum_required(VERSION 3.16)
project(Proj_1) # 工程名字可以与可执行程序名不一样
add_executable(create1 create1.c)

# 查找pthread库
find_package(Threads REQUIRED)
# 链接pthread库到你的目标
target_link_libraries(create1 Threads::Threads)

运行之后

1
2
3
(base) zxz@zxz-B660M-GAMING-X-AX-DDR4:~/Proj/CLionProj/UNIX/IO/thread/create1/build$ ./create1 
Begin!
End!

最后运行程序之后并没有显示调用线程函数中的输出内容,(线程的调用取决于调度器策略)这是因为线程还没有来得及调用,进程已经通过exit(0)结束


(2)pthread_once()

该函数确保某个指定的初始化函数在一个程序中只被执行一次,即使多个线程尝试执行它。这个函数特别适用于多线程环境中的一次性初始化任务,例如初始化全局变量、设置环境或配置单例模式

1
2
3
#include <pthread.h>

int pthread_once(pthread_once_t *once_control, void (*init_routine)(void));

参数:

  • **once_control**:指向 pthread_once_t 类型的变量的指针,该变量必须使用 PTHREAD_ONCE_INIT 初始化。这个变量跟踪初始化函数是否已被调用
  • **init_routine**:指向要执行的初始化函数的指针。这个函数没有参数和返回值。pthread_once 会确保这个函数只被执行一次

返回:

  • 返回 0 表示成功。
  • 返回非零值表示发生错误

实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#include <pthread.h>
#include <stdio.h>

// 定义全局 once 控制变量
pthread_once_t once = PTHREAD_ONCE_INIT;

// 初始化函数
void initialize() {
printf("Initialization done.\n");
}

// 线程函数
void* thread_function(void* arg) {
// 使用 pthread_once 确保初始化函数只被执行一次
pthread_once(&once, initialize);
printf("Thread %ld running.\n", (long) arg);
return NULL;
}

int main() {
pthread_t threads[4];

// 创建多个线程
for (long i = 0; i < 4; i++) {
pthread_create(&threads[i], NULL, thread_function, (void*)i);
}

// 等待所有线程完成
for (int i = 0; i < 4; i++) {
pthread_join(threads[i], NULL);
}

return 0;
}


3.线程终止

(1)终止方式
  • 线程从启动历程返回.返回值就是线程的退出码
  • 线程可以被统同一进程中的其他线程取消
  • 线程调用pthread_exit()函数

(2)pthread_exit()

pthread_exit函数用于从调用线程中退出,并提供一个退出值,该值可由另一个线程通过pthread_join获取。当一个线程调用pthread_exit时,线程的清理处理程序将被执行,其存储空间(例如栈)会被回收

函数原型

1
void pthread_exit(void *retval);

参数:

  • retval: 是一个指向任意数据类型的指针,表示线程的退出值。如果你不需要返回特定的值,可以简单地传递NULL

说明:

  • 调用pthread_exit将不会关闭整个进程,只会关闭调用它的线程。
  • 如果主线程调用了pthread_exit而没有结束程序,那么其他线程仍然可以继续执行。
  • 如果从主函数返回,其行为类似于调用exit(整个进程都会结束,所有线程都会被关闭)。但是,如果主线程调用pthread_exit,进程不会结束,其他线程可以继续执行

(3)pthread_join()

该函数类似于进程中的wait()函数,用于给线程进行收尸

pthread_joinPOSIX 线程库中的一个函数,用于阻塞调用线程,直到指定的线程终止。这个函数还允许调用线程检索目标线程的退出状态

函数原型

1
int pthread_join(pthread_t thread, void **retval);

参数

  • thread:是要等待的线程的标识符。
  • retval:是一个指向指针的指针,用于获取线程的退出值。如果不关心退出值,可以设置为 NULL

返回值

  • 如果成功,返回 0
  • 如果出错,返回一个非零错误代码。

注意事项

  • 如果目标线程已经终止,但还没有被join,那么它被称为“僵尸线程”。pthread_join 的一个主要用途是从系统中清除僵尸线程。
  • 一个线程只能被 join 一次。一旦一个线程被成功地 join,它的 ID 就可以被再次使用。
  • 线程不是必须被 join。例如,如果你不关心线程的退出状态,你可以选择不 join

程序实例

create1.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <pthread.h>
#include <string.h>
#include <unistd.h>


// 指针函数 return NULL 指针
static void *func(void *p)
{
puts("Thread is working!");
// 线程结束
pthread_exit(NULL);
}



int main()
{
int err;
void *thread_exit_status;
pthread_t tid;

puts("Begin!");

err = pthread_create(&tid,NULL,func,NULL);

// 0表示成功 非0则失败
if(err)
{
// perror("pthread_creat()");
fprintf(stderr,"pthread_create():%s\n", strerror(err));
exit(1);
}


// 等待线程结束,对线程进行资源的回收
pthread_join(tid,thread_exit_status);
printf("Thread exit status: %ld\n", (long)thread_exit_status);
puts("End!");

// 进程结束
exit(0);
}

(4)栈的清理(线程中的钩子函数)

在多线程编程中,线程可能在执行期间的任何时间点被取消。当线程被取消时,可能会需要执行一些清理任务,如释放资源、解锁互斥锁等。POSIX线程库提供了pthread_cleanup_pushpthread_cleanup_pop函数,允许你指定在线程退出或被取消时执行的清理任务

pthread_cleanup_push 函数

该函数将清理处理程序(一个函数)推入当前线程的清理处理程序堆栈。

1
void pthread_cleanup_push(void (*routine) (void *), void *arg);
  • routine: 这是一个指向清理函数的指针。当线程退出或被取消时,这个函数将被调用。
  • arg: 这是传递给清理函数的参数

pthread_cleanup_pop 函数

该函数从当前线程的清理处理程序堆栈中弹出最近推入的清理处理程序

1
void pthread_cleanup_pop(int execute);
  • execute: 如果非零,清理函数将被执行;否则,清理函数不会被执行
程序实例

create2.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <pthread.h>
#include <string.h>
#include <unistd.h>


static void cleanup_func(void *p)
{
puts((char *) p);
}

// 指针函数 return NULL 指针
static void *func(void *p)
{
puts("Thread is working!");

// 压入栈(挂上钩子)
pthread_cleanup_push(cleanup_func, "cleanup:1");
pthread_cleanup_push(cleanup_func, "cleanup:2");
pthread_cleanup_push(cleanup_func, "cleanup:3");

puts("push over!");


pthread_cleanup_pop(1);
pthread_cleanup_pop(1);
pthread_cleanup_pop(1);

// 线程结束
pthread_exit(NULL);
}



int main()
{
int err;
pthread_t tid;

puts("Begin!");

err = pthread_create(&tid,NULL,func,NULL);

// 0表示成功 非0则失败
if(err)
{
fprintf(stderr,"pthread_create():%s\n", strerror(err));
exit(1);
}


// 等待线程结束,对线程进行资源的回收
pthread_join(tid,NULL);
puts("End!");

// 进程结束
exit(0);
}

CMakeLists.txt

1
2
3
4
5
6
cmake_minimum_required(VERSION 3.16)
project(Proj_1) # 工程名字可以与可执行程序名不一样
add_executable(create2 create2.c)

find_package(Threads REQUIRED)
target_link_libraries(create2 Threads::Threads)

运行结果

1
2
3
4
5
6
7
8
9
(base) zxz@zxz-B660M-GAMING-X-AX-DDR4:~/Proj/CLionProj/UNIX/IO/thread/create2/build$ ./create2 
Begin!
Thread is working!
push over!
cleanup:3
cleanup:2
cleanup:1
End!

需要注意的是,pthread_cleanup_pushpthread_cleanup_pop必须在同一个代码块中,并且对于每个push都必须有一个匹配的pop,pthread_cleanup_pop函数即使放到static void *func(void *p)函数中的pthread_exit(NULL)之后也可以,但是必须要有,这两个所谓的函数实质上是宏



4.线程取消

(1)pthread_cancel()

pthread_cancel 函数用于请求取消一个线程。需要注意的是,“请求”取消并不意味着线程立即终止。相反,目标线程决定在哪些点上检查取消请求,这些点称为“取消点”

函数原型

1
int pthread_cancel(pthread_t thread);

参数

  • thread:要取消的线程的标识符

返回值

  • 如果成功,返回 0
  • 如果出错,返回一个非零错误代码

注意事项:

  • 取消一个线程并不立即终止它,但是会设置一个请求来取消线程。线程在取消点检查这个请求。
  • 常见的取消点包括一些系统调用,如open(),read(), write(), sleep()pthread_testcancel()等。
  • 可以使用pthread_setcancelstate()pthread_setcanceltype()函数来控制线程的取消行为
取消状态
  • 两种:允许与不允许
允许取消
  • 分为异步cancel,推迟cancel(默认)–>推迟至cancel点在响应

  • cancel点(取消点): POSIX定义的cancel点,都是可能引发阻塞的系统调用(注意事项的第二点)

pthread_setcancelstate()
  • 设置是否允许取消
pthread_setcanceltype()
  • 设置取消方式
pthread_testcancel()
  • 该函数什么都不做,就是一个取消点

(2)线程分离

在多线程编程中,线程分离(Detached threads)是一个重要的概念。当一个线程结束运行并退出时,其退出状态需要被另一个线程收集,这通常是通过 pthread_join 函数完成的。如果一个线程没有被其他线程join,那么它的结束状态就会被留在系统中,这种线程被称为“僵尸线程”。

为了避免这种情况,我们可以创建一个分离的线程。一个分离的线程在结束时会自动清理其资源,不需要其他线程来join

如何创建一个分离线程?
  • 使用 pthread_attr_tpthread_attr_init

可以在创建线程之前设置线程属性使其为分离的

1
2
3
4
5
6
7
8
9
pthread_t tid;
pthread_attr_t attr;

pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

pthread_create(&tid, &attr, thread_function, NULL);

pthread_attr_destroy(&attr); // Don't forget to destroy the attribute
  • 使用 pthread_detach

如果线程已经创建了,你可以使用 pthread_detach 函数来将它设置为分离的

1
2
3
4
pthread_t tid;

pthread_create(&tid, NULL, thread_function, NULL);
pthread_detach(tid);

注意

  • 分离的线程不能被其他线程join
  • 你不应该分离一个线程并且也试图join它。这种行为是未定义的。
  • 如果你知道一个线程不会被join,最好将其设置为分离的,以确保资源得到适当的清理。
程序实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include <stdio.h>
#include <pthread.h>

void* thread_function(void* arg) {
printf("Inside the detached thread\n");
// 执行完该函数的工作后,对线程进行终止
pthread_exit(NULL);
}

int main() {
pthread_t tid;

pthread_create(&tid, NULL, thread_function, NULL);
// 上述被创建的线程以及被分离,不需要通过join进行收尸,会自动清理自己
pthread_detach(tid);

printf("Main thread continuing...\n");
sleep(2); // Allow the detached thread to finish
return 0;
}

在这个例子中,创建了一个分离的线程,它会自动清理自己,主线程只是继续运行并在最后休眠了一段时间以等待分离的线程完成



5.线程竞争

线程竞争,通常称为竞争条件(Race Condition),发生在两个或更多线程并发访问共享资源,并尝试读、写或修改这些资源,导致执行结果变得不可预测。这是因为线程的调度方式是不确定的,所以你不能预知哪个线程首先访问资源或在什么时候进行访问

竞争条件可能导致各种问题,包括:

  • 数据不一致:共享数据可能处于不一致的状态。
  • 程序崩溃:如果一个线程期望某些数据保持不变,但另一个线程修改了它,可能导致不可预测的行为。
  • 性能问题:例如,如果两个线程都尝试更新相同的数据,可能需要重试,从而降低效率

解决方法

  • 互斥量 (Mutexes):这是一种同步原语,可以确保一次只有一个线程可以访问某些资源或代码段。例如,pthread_mutex_tPOSIX线程库中的一个常见互斥量。
  • 读-写锁 (Read-Write Locks):允许多个线程同时读共享资源,但一次只有一个线程可以写。
  • 信号量 (Semaphores):是一个更通用的同步工具,它可以控制同时访问某个资源的线程数。
  • 原子操作 (Atomic Operations):这些操作在单个操作中完成,不会被其他线程中断。这些操作通常由硬件支持,并可确保数据的完整性。

防止竞争条件的关键是识别并保护对共享资源的所有访问,并确保一次只有一个线程可以执行修改。


(1)程序实例

该程序目的,创建多个线程同时对一个文件进行读写,最后每次运行程序文件中的数字应该要比运行前大20

在程序的同级别目录下创建out文件,终端指令echo 1 > ./out向该文件中输入数字1

primer0.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>

#define THRNUM 20
#define FNAME "../out"
#define LINESIZE 1024

// 线程函数
static void *thr_add(void *p)
{
FILE *fp;
char linebuf[LINESIZE];

// 打开文件
fp = fopen(FNAME,"r+");
if(fp ==NULL)
{
perror("fopen()");
exit(1);
}


// 读取文件中的一行
fgets(linebuf,LINESIZE,fp);
// 定位到文件开始的位置进行覆盖写入
fseek(fp,0,SEEK_SET);
// 此处的sleep可以将错误进行放大,因为若机器为单核的可能程序运行后就是预期的结果
sleep(1);
// 将l1infbuf字符串中的转为整形+1 在写入文件流fp中
fprintf(fp,"%d\n",atoi(linebuf)+1);
// 关闭
fclose(fp);

// 关闭线程
pthread_exit(NULL);


}

int main()
{
int i,err;
pthread_t tid[THRNUM];

for(i = 0;i<THRNUM;i++)
{
err = pthread_create(tid+i,NULL,thr_add,NULL);
// 线程创建失败非0,即为真
if(err)
{
fprintf(stderr,"pthread_create():%s\n",strerror(err));
// 异常退出主进程
exit(1);
}

}


// 回收线程资源(收尸)
for(i = 0;i<THRNUM;i++)
{
pthread_join(tid[i],NULL);
}

exit(0);
}

CMakeLists.txt

1
2
3
4
5
6
cmake_minimum_required(VERSION 3.16)
project(Proj_1) # 工程名字可以与可执行程序名不一样
add_executable(primer0 primer0.c)

find_package(Threads REQUIRED)
target_link_libraries(primer0 Threads::Threads)

运行结果:

最后文件中的值为2,是因为出现了线程竞争,相当于20个线程对文件中的内容取20次,都为1在对文件重复写了20次都为2

1
2
3
(base) zxz@zxz-B660M-GAMING-X-AX-DDR4:~/Proj/CLionProj/UNIX/IO/thread/create3/build$ ./primer0 
(base) zxz@zxz-B660M-GAMING-X-AX-DDR4:~/Proj/CLionProj/UNIX/IO/thread/create3/build$ cat ../out
2

(2)线程同步
互斥量

互斥量(Mutex, 是 Mutual Exclusion 的缩写)是多线程编程中的一个同步原语,它用于保护对共享资源的并发访问,从而避免竞争条件。一个互斥量在任意时刻都只能被一个线程锁定。如果其他线程试图再次锁定它,它们会被阻塞,直到第一个线程释放互斥量

POSIX 线程(pthreads)库中,互斥量的类型是 pthread_mutex_t

使用互斥量基本步骤

  • 初始化互斥量(静态):
1
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
  • 锁定互斥量: 当你需要保护一段代码或资源时,可以尝试锁定互斥量。
1
pthread_mutex_lock(&mutex);
  • 访问受保护的资源或代码

  • 解锁互斥量: 访问完成后,你应该立即释放互斥量。

1
pthread_mutex_unlock(&mutex);
  • 销毁互斥量: 当不再需要互斥量时,应确保销毁它。
1
pthread_mutex_destroy(&mutex);
pthread_mutex_init()

这个函数用于初始化互斥量

1
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);

参数

  • mutex:指向要初始化的互斥量。
  • attr:指定互斥量属性,如果设为NULL,则使用默认属性。

返回值

  • 成功:返回0
  • 错误:返回错误号
pthread_mutex_destroy()

这个函数用于销毁一个已经不再需要的互斥量

1
2
int pthread_mutex_destroy(pthread_mutex_t *mutex);

参数

  • mutex:指向要销毁的互斥量。

返回值

  • 成功:返回0
  • 错误:返回错误号
pthread_mutex_lock()

尝试锁定一个互斥量。如果互斥量已经被其他线程锁定,该调用会阻塞,直到互斥量可用为止

1
2
int pthread_mutex_lock(pthread_mutex_t *mutex);

参数

  • mutex:指向要锁定的互斥量。

返回值

  • 成功:返回0
  • 错误:返回错误号
pthread_mutex_trylock()

尝试锁定一个互斥量。如果互斥量已经被其他线程锁定,该调用会立即返回一个EBUSY错误,而不是阻塞

1
2
int pthread_mutex_trylock(pthread_mutex_t *mutex);

参数

  • mutex:指向要尝试锁定的互斥量。

返回值

  • 成功:返回0
  • 错误:返回错误号
pthread_mutex_unlock()

解锁一个互斥量

1
2
int pthread_mutex_unlock(pthread_mutex_t *mutex);

参数

  • mutex:指向要解锁的互斥量。

返回值

  • 成功:返回0
  • 错误:返回错误号
注意

这些函数使你能够在多线程环境中安全地对共享资源进行操作,从而避免产生竞争条件。但要记住,当你使用互斥量时,应确保在所有的代码路径上,每次锁定都有相应的解锁,以避免死锁和其他同步问题


(3)程序实例(互斥量)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>

#define THRNUM 20
#define FNAME "../out"
#define LINESIZE 1024

// 初始化互斥量
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

// 线程函数
static void *thr_add(void *p)
{
FILE *fp;
char linebuf[LINESIZE];

// 打开文件
fp = fopen(FNAME,"r+");
if(fp ==NULL)
{
perror("fopen()");
exit(1);
}

// 多个线程无法同时进行操作的资源,锁定互斥量
pthread_mutex_lock(&mutex);
// 读取文件中的一行
fgets(linebuf,LINESIZE,fp);
// 定位到文件开始的位置进行覆盖写入
fseek(fp,0,SEEK_SET);
sleep(1);
// 将linfbuf字符串中的转为整形+1 在写入文件流fp中
fprintf(fp,"%d\n",atoi(linebuf)+1);
// 关闭
fclose(fp);
// 解锁
pthread_mutex_unlock(&mutex);

// 关闭线程
pthread_exit(NULL);


}

int main()
{
int i,err;
pthread_t tid[THRNUM];

for(i = 0;i<THRNUM;i++)
{
err = pthread_create(tid+i,NULL,thr_add,NULL);
// 线程创建失败非0,即为真
if(err)
{
fprintf(stderr,"pthread_create():%s\n",strerror(err));
// 异常退出主进程
exit(1);
}

}


// 回收线程资源(收尸)
for(i = 0;i<THRNUM;i++)
{
pthread_join(tid[i],NULL);
}

// 互斥量销毁
pthread_mutex_destroy(&mutex);

exit(0);
}

运行结果:

休眠一段事件后,正确的结果

1
2
3
base) zxz@zxz-B660M-GAMING-X-AX-DDR4:~/Proj/CLionProj/UNIX/IO/thread/create3/build$ ./primer0 
(base) zxz@zxz-B660M-GAMING-X-AX-DDR4:~/Proj/CLionProj/UNIX/IO/thread/create3/build$ cat ../out
21


6.线程池

视频教程

线程池是多线程编程中的一种设计模式,它用于为待执行的任务提供已经启动的线程,而不是为每个任务启动一个新线程。这种方法的好处是可以减少创建和销毁线程的开销,并限制系统中线程的数量,这样可以预防资源的过度使用和系统的过载

线程池通常由以下几个核心组件组成:

  • 任务队列:存储待执行的任务。
  • 工作者线程:线程池中的线程,它们不断从任务队列中取出任务并执行。
  • 线程池管理器:负责添加、暂停或终止工作者线程

线程池的工作流程通常如下:

  • 初始化线程池并创建一定数量的工作者线程。
  • 当一个新任务到达时,将其添加到任务队列。
  • 工作者线程从任务队列中取出任务并执行。完成后,它会再次检查队列以获取下一个任务。
  • 当任务队列为空,线程可以选择休眠等待,或根据具体的线程池策略进行其他操作
(1)C语言中实现线程池

C语言本身并不直接支持线程池,但你可以使用pthread库手动实现一个,或使用现有的第三方库,如libthreadpool

如果你决定自己实现线程池,这里有一些建议的步骤:

  • 定义数据结构:为线程池、工作者线程和任务定义数据结构。
  • 初始化线程池:根据指定的大小创建工作者线程,并初始化任务队列。
  • 提交任务:提供一个函数,使用户能够将任务添加到队列。
  • 工作者线程的主函数:此函数应从任务队列中提取任务并执行它们。
  • 清理和关闭:提供一个函数来停止所有工作者线程并清理线程池。

虽然从头实现一个线程池是一个很好的学习经验,但如果你要在生产环境中使用线程池,建议使用已经存在且经过良好测试的第三方库,因为线程同步和管理都是比较复杂的主题,容易出错



7.线程令牌桶

在线程令牌桶系统中,和进程不同,不通过alarm或者setitimer函数定时并且发送信号,让信号行为函数执行,而是通过让积攒令牌桶系统中各个令牌桶的令牌数的子线程进行休眠固定时间。

创建子线程,对令牌桶中的令牌进行操作(每1s增加CPS)

查询法–存在忙等

该令牌桶系统只有一个子线程负责,为令牌桶数组中的所有令牌桶,每间隔1s,为存在的每个令牌桶积攒令牌数

mytbf.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#ifndef THREADTOKENBUCKET_MYTBF_H
#define THREADTOKENBUCKET_MYTBF_H

#define MYTBF_MAX 1024 // 令牌桶数组大小

typedef void mytbf_t;

// cps 每个令牌可以传输的内容字节数量 burst 桶内的令牌数量上限*每个令牌可以传输的内容字节数量
mytbf_t *mytbf_init(int cps,int burst);

// 从令牌桶中取得令牌,int 第二个参数是想取多少
// 返回值,为真正取得了多少
int mytbf_fetchtoken(mytbf_t *,int );

// 将没有用完的令牌数量还到令牌桶中, int 第二个参数是想还回去的数量
// 返回值,为真正还了多少
int mytbf_returntoken(mytbf_t *,int);

// 销毁令牌桶
int mytbf_destory(mytbf_t *);

#endif //THREADTOKENBUCKET_MYTBF_H

mytbf.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <pthread.h>
#include <string.h>

#include "mytbf.h"

/*令牌桶类型数组,对于文件可以有MYTBF_MAX中传输速率选择*/
static struct mytbf_st* job[MYTBF_MAX]; // 这是多个进程共享的资源,需要作线程同步,避免数据混乱
// 线程互斥量的初始化
static pthread_mutex_t mut_job = PTHREAD_MUTEX_INITIALIZER;
// 子进程ID
static pthread_t tid_alarm;
// 定义全局 once 控制变量,这个变量跟踪初始化函数是否已被调用
static pthread_once_t init_once = PTHREAD_ONCE_INIT;

/*一个令牌桶的信息结构体*/
struct mytbf_st
{
int cps; // 令牌桶每次可以积攒的令牌数
int burst; // 一个令牌桶中积攒的令牌数上限
int token; // 一个令牌桶的实时令牌数
int pos; // 该令牌桶位于令牌桶系统中的令牌桶数组中的位置索引
pthread_mutex_t mut; // 每一个令牌桶通过互斥量保证token的同步
};

/*信号处理函数*/
static void *thr_alarm(void *p)
{
// 这个子线程一直运行
while(1)
{
// 访问令牌桶系统的共享资源,枷锁
pthread_mutex_lock(&mut_job);
for (int i = 0; i < MYTBF_MAX; i++)
{
// 当前令牌桶数组位置存在数据
if(job[i] != NULL)
{
// 操作该令牌桶的令牌数,枷锁
pthread_mutex_lock(&job[i]->mut);
// 为i位置的令牌桶装载令牌
job[i]->token += job[i]->cps;
// 但是令牌桶所积累的令牌数量需要维持在令牌桶的上限
if(job[i]->token > job[i]->burst)
job[i]->token = job[i]->burst;
pthread_mutex_unlock(&job[i]->mut);
}
}
pthread_mutex_unlock(&mut_job);
sleep(1); // 1秒钟进行一次
}

}

/*模块卸载*/
static void module_unload(void)
{
pthread_cancel(tid_alarm); // 子线程取消
pthread_join(tid_alarm,NULL); // 子线程取消

// 枷锁--保持线程同步
pthread_mutex_lock(&mut_job);
for (int i = 0; i < MYTBF_MAX; i++)
{
if(job[i] != NULL)
{
mytbf_destory(job[i]);
}
}
pthread_mutex_unlock(&mut_job);

pthread_mutex_destroy(&mut_job); // 销毁令牌桶系统的互斥量
}

/*模块加载函数*/
static void module_load(void)
{
int err;
// 该子线程在整个令牌桶系统中,只创建一次
// 创建子线程,每休息1s,为令牌桶积攒CPS数量的令牌数
err = pthread_create(&tid_alarm,NULL,thr_alarm,NULL);
if(err)
{
fprintf(stderr,"pthread_create():%s\n",strerror(err));
exit(1);
}

// 钩子函数,程序结束前最后执行
atexit(module_unload);
}


static int min(int a, int b)
{
if(a < b)
return a;
return b;
}


/*在令牌桶数组中找到空位置*/
// 这个函数本身没有加线程锁,但是调用的时候应该枷锁
// 因为令牌桶数组为共享资源,需要保持线程同步,所以设置为unlocked
static int get_free_pos_unlocked(void)
{
for(int i = 0; i < MYTBF_MAX; i++)
{
if(job[i] == NULL)
return i; // 返回空位置的下标
}

return -1;
}

// cps 每个令牌可以传输的内容字节数量 burst 桶内的令牌数量上限*每个令牌可以传输的内容字节数量
mytbf_t *mytbf_init(int cps,int burst)
{
struct mytbf_st *me;

// 模块加载函数,即使多个线程执行,但是其只会在第一个调用的线程中执行一次
pthread_once(&init_once,module_load);

// 为当前的令牌桶开辟空间
me = malloc(sizeof(*me));
if(me == NULL)
return NULL;

me->token = 0; // 当前令牌桶,初始令牌为0
me->cps = cps; // 每次可以获得的令牌数量为cps
me->burst = burst; // 令牌桶中可以积攒的令牌数量上限为burst
pthread_mutex_init(&me->mut,NULL); // 关于令牌桶中令牌的互斥量初始化

// 访问共享资源,枷锁保持线程同步
pthread_mutex_lock(&mut_job);
int pos = get_free_pos_unlocked();
if(pos < 0)
{
pthread_mutex_unlock(&mut_job); // 失败,解锁退出
free(me);
return NULL;
}

me->pos = pos;
job[pos] = me;
// 解锁
pthread_mutex_unlock(&mut_job);

return me;
}


/*
* 从令牌桶中取得令牌,int 第二个参数是想取多少
* */
// 返回值,为真正取得了多少令牌
int mytbf_fetchtoken(mytbf_t *ptr,int size)
{
struct mytbf_st *me = ptr;
int n;

if(size < 0)
// EINVAL(Invalid Argument)是另一个常见的错误代码,表示传递给系统调用的一个或多个参数是无效的
return -EINVAL; // 这样就可以使用strerror(errno)进行报错

// 访问该令牌桶中的令牌,但是可能有其他线程此时正准备归还令牌,因此需要保证线程同步
// 枷锁
pthread_mutex_lock(&me->mut);
/*************************查询法:忙等********************************/
while(me->token <= 0) // 这个地方是查询法,存在忙等
{
pthread_mutex_unlock(&me->mut);
sched_yield(); // 调用线程主动让出CPU时间片,减少忙等带来的CPU资源浪费
pthread_mutex_lock(&me->mut);
}
/*************************查询法:忙等********************************/

// 取小值
n = min(me->token,size);
me->token -= n; // 令牌数量减少
// 解锁
pthread_mutex_unlock(&me->mut);

return n;
}


/*
* 将没有用完的令牌数量还到令牌桶中, int 第二个参数是想还回去的数量
* */
// 返回值,为真正还了多少令牌
int mytbf_returntoken(mytbf_t *ptr,int size)
{
struct mytbf_st *me = ptr;

if(size < 0)
// EINVAL(Invalid Argument)是另一个常见的错误代码,表示传递给系统调用的一个或多个参数是无效的
return -EINVAL; // 这样就可以使用strerror(errno)进行报错

// 访问该令牌桶中的令牌,但是可能有其他线程此时正准备归还令牌,因此需要保证线程同步
// 枷锁
pthread_mutex_lock(&me->mut);
me->token += size; // 归还了,当前令牌桶的令牌数量增加
// 但是还是要约束在上限内
if(me->token > me->burst)
me->token = me->burst;
pthread_mutex_unlock(&me->mut);

return size;
}


/*
* 销毁令牌桶
* */
int mytbf_destory(mytbf_t *ptr)
{
// 将传入的令牌桶销毁
struct mytbf_st *me = ptr;

// 将当前令牌桶占用的令牌桶数组位置赋值为NULL,共享资源,枷锁
pthread_mutex_lock(&mut_job);
job[me->pos] = NULL;
pthread_mutex_unlock(&mut_job);

// 销毁该令牌桶中有关令牌的互斥量
pthread_mutex_destroy(&me->mut);
// 释放创建传入的令牌桶开辟的动态空间
free(ptr);

return 0;
}

main.c

main函数中通过令牌桶实现模拟cat指令中的流量控制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>

#include "mytbf.h"

#define CPS 10 // 每秒传输的字节数量,一次取得令牌数量(一次积攒的令牌数量)
#define BUFSIZE 1024
#define BURST 100 // 令牌桶中积攒令牌数量上限

int main(int argc,char *argv[])
{
int sfd,dfd = 1;
int len = 0;
int ret = 0;
int pos = 0;
int size = 0;
char buf[BUFSIZE];
mytbf_t *tbf;

if(argc < 2)
{
fprintf(stderr,"Usage:%s <src_file>\n",argv[0]);
exit(1);
}

/*1.创建令牌桶*/
// CPS表示每次可以获得令牌数量,BURST表示令牌桶中可以积攒令牌数量上限
tbf = mytbf_init(CPS,BURST);
if(tbf == NULL)
{
fprintf(stderr, "mytbf_init() failed!\n");
exit(1);
}
/************************************************************/


/*2.打开需要查看的文件*/
do{
sfd = open(argv[1], O_RDONLY);
if(sfd < 0)
{
// 不是假错,报错,退出
if(errno != EINTR)
{
perror("open()");
exit(1);
}
}
}while(sfd < 0);
/************************************************************/


while (1)
{
/*3.取令牌,想要取得BUFSIZE个令牌*/
// size 为实际取得的令牌数量
size = mytbf_fetchtoken(tbf, BUFSIZE);
if(size < 0)
{
fprintf(stderr, "mytbf_fetchtoken():%s\n", strerror(-size));
exit(1);
}
/************************************************************/


/*4.通过获取的令牌,从源文件中,进行数据读取*/
// 成功取得令牌,有权限读取数据,因为只有size个令牌,因此只能读取的数据为size个字节
while((len = read(sfd,buf,size)) < 0)
{
// 假错
if(errno == EINTR)
continue;
perror("read()");
exit(1);
}
if(len == 0)
break; // 读取到文件尾
/************************************************************/


/*5.令牌归还*/
// 当前取得的token没有消耗完,进行归还
// 假设说,在文件末尾最后只有三个字节的数据,但是我这次可以取得的令牌数量为10
// 因此对于这最后剩余的三个字节,只需要消耗三个令牌,还剩余7个令牌需要归还到令牌桶中
if((size - len) > 0)
mytbf_returntoken(tbf,size-len); // 归还剩余令牌
/************************************************************/


/*6.将读取的内容写到目标的文件描述符映射的文件中,即标准输出,终端上*/
pos = 0;
while(len > 0)
{
ret = write(dfd, buf + pos, len);
if(ret < 0)
{
// 假错误
if(errno == EINTR)
continue;
perror("write()");
exit(1);
}
pos += ret;
len -= ret;
}
/************************************************************/
}

// 关闭文件描述符
close(sfd);

// 销毁令牌桶
mytbf_destory(tbf);

exit(0);
}

CMakeLists.txt

1
2
3
4
5
6
7
8
9
10
11
12
cmake_minimum_required(VERSION 3.19)
project(ThreadTokenBucket C)

set(CMAKE_C_STANDARD 99)

# 找线程库
find_package(Threads REQUIRED)

add_executable(ThreadTokenBucket main.c mytbf.c mytbf.h)

# 链接线程库
target_link_libraries(ThreadTokenBucket ${CMAKE_THREAD_LIBS_INIT})

编译运行,发现程序在以每间隔1s,以CPS字节数量的速度输出想查看文件内容

我是在虚拟机上面运行的程序,处理器数量为2,每个处理器的核心数为2,虚拟的CPU为2x2=4。使用编译的程序,显示一个内容比较多的文件时。使用htop在另外一个终端,查看ubuntu的cpu占用率,如下图:

image-20240509150654244

系统的第2个虚拟CPU占用率为100%,这就是因为忙等的存在使得CPU占用率过高,接下来将使用条件变量通知法解决这个问题

信号量通知法–处理忙等

mytbf.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#ifndef THREADCONDTOKENBUCKET_MYTBF_H
#define THREADCONDTOKENBUCKET_MYTBF_H

#define MYTBF_MAX 1024 // 令牌桶数组大小

typedef void mytbf_t; // 令牌桶属性 mytbf_t

// cps 每个令牌可以传输的内容字节数量 burst 桶内的令牌数量上限*每个令牌可以传输的内容字节数量
mytbf_t *mytbf_init(int cps,int burst);

// 从令牌桶中取得令牌,int 第二个参数是想取多少
// 返回值,为真正取得了多少
int mytbf_fetchtoken(mytbf_t *,int );

// 将没有用完的令牌数量还到令牌桶中, int 第二个参数是想还回去的数量
// 返回值,为真正还了多少
int mytbf_returntoken(mytbf_t *,int);

// 销毁令牌桶
int mytbf_destory(mytbf_t *);

#endif //THREADCONDTOKENBUCKET_MYTBF_H

mytbf.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <pthread.h>
#include <string.h>

#include "mytbf.h"

/*令牌桶类型数组,对于文件可以有MYTBF_MAX中传输速率选择*/
static struct mytbf_st* job[MYTBF_MAX]; // 这是多个进程共享的资源,需要作线程同步,避免数据混乱
// 线程互斥量的初始化
static pthread_mutex_t mut_job = PTHREAD_MUTEX_INITIALIZER;
// 子进程ID
static pthread_t tid_alarm;
// 定义全局 once 控制变量,这个变量跟踪初始化函数是否已被调用
static pthread_once_t init_once = PTHREAD_ONCE_INIT;

/*一个令牌桶的信息结构体*/
struct mytbf_st
{
int cps; // 令牌桶每次可以积攒的令牌数
int burst; // 一个令牌桶中积攒的令牌数上限
int token; // 一个令牌桶的实时令牌数
int pos; // 该令牌桶位于令牌桶系统中的令牌桶数组中的位置索引
pthread_mutex_t mut; // 每一个令牌桶通过互斥量保证token的同步
pthread_cond_t cond; // 条件变量--用于通知令牌桶令牌数量的变化
};


/*信号处理函数*/
static void *thr_alarm(void *p)
{
// 这个子线程一直运行
while(1)
{
// 访问令牌桶系统的共享资源,枷锁
pthread_mutex_lock(&mut_job);
for (int i = 0; i < MYTBF_MAX; i++)
{
// 当前令牌桶数组位置存在数据
if(job[i] != NULL)
{
// 操作该令牌桶的令牌数,枷锁
pthread_mutex_lock(&job[i]->mut);
// 为i位置的令牌桶装载令牌
job[i]->token += job[i]->cps;
// 但是令牌桶所积累的令牌数量需要维持在令牌桶的上限
if(job[i]->token > job[i]->burst)
job[i]->token = job[i]->burst;
// for 循环内给令牌桶数组中的所有令牌桶均加了token
// 需要唤醒所有的阻塞(挂起)的线程,去唤醒,此时令牌桶中的令牌>0,可以消费了
pthread_cond_broadcast(&job[i]->cond);
pthread_mutex_unlock(&job[i]->mut);
}
}
pthread_mutex_unlock(&mut_job);
sleep(1); // 1秒钟进行一次
}
}


/*模块卸载*/
static void module_unload(void)
{
pthread_cancel(tid_alarm); // 子线程取消
pthread_join(tid_alarm,NULL); // 子线程取消

// 枷锁--保持线程同步
pthread_mutex_lock(&mut_job);
for (int i = 0; i < MYTBF_MAX; i++)
{
if(job[i] != NULL)
{
mytbf_destory(job[i]);
}
}
pthread_mutex_unlock(&mut_job);

pthread_mutex_destroy(&mut_job); // 销毁令牌桶系统的互斥量
}


/*模块加载函数*/
static void module_load(void)
{
int err;
// 该子线程在整个令牌桶系统中,只创建一次
// 创建子线程,每休息1s,为令牌桶积攒CPS数量的令牌数
err = pthread_create(&tid_alarm,NULL,thr_alarm,NULL);
if(err)
{
fprintf(stderr,"pthread_create():%s\n",strerror(err));
exit(1);
}

// 钩子函数,程序结束前最后执行
atexit(module_unload);
}


static int min(int a, int b)
{
if(a < b)
return a;
return b;
}


/*在令牌桶数组中找到空位置*/
// 这个函数本身没有加线程锁,但是调用的时候应该枷锁
// 因为令牌桶数组为共享资源,需要保持线程同步,所以设置为unlocked
static int get_free_pos_unlocked(void)
{
for(int i = 0; i < MYTBF_MAX; i++)
{
if(job[i] == NULL)
return i; // 返回空位置的下标
}

return -1;
}


// cps 每个令牌可以传输的内容字节数量 burst 桶内的令牌数量上限*每个令牌可以传输的内容字节数量
mytbf_t *mytbf_init(int cps,int burst)
{
struct mytbf_st *me;

// 模块加载函数,即使多个线程执行,但是其只会在第一个调用的线程中执行一次
pthread_once(&init_once,module_load);

// 为当前的令牌桶开辟空间
me = malloc(sizeof(*me));
if(me == NULL)
return NULL;

me->token = 0; // 当前令牌桶,初始令牌为0
me->cps = cps; // 每次可以获得的令牌数量为cps
me->burst = burst; // 令牌桶中可以积攒的令牌数量上限为burst
pthread_mutex_init(&me->mut,NULL); // 关于令牌桶中令牌的互斥量初始化
pthread_cond_init(&me->cond,NULL); // 初始化条件变量

// 访问共享资源,枷锁保持线程同步
pthread_mutex_lock(&mut_job);
int pos = get_free_pos_unlocked();
if(pos < 0)
{
pthread_mutex_unlock(&mut_job); // 失败,解锁退出
free(me);
return NULL;
}

me->pos = pos;
job[pos] = me;
// 解锁
pthread_mutex_unlock(&mut_job);

return me;
}


/*
* 从令牌桶中取得令牌,int 第二个参数是想取多少
* */
// 返回值,为真正取得了多少令牌
int mytbf_fetchtoken(mytbf_t *ptr,int size)
{
struct mytbf_st *me = ptr;
int n;

if(size < 0)
// EINVAL(Invalid Argument)是另一个常见的错误代码,表示传递给系统调用的一个或多个参数是无效的
return -EINVAL; // 这样就可以使用strerror(errno)进行报错

// 访问该令牌桶中的令牌,但是可能有其他线程此时正准备归还令牌,因此需要保证线程同步
// 枷锁
pthread_mutex_lock(&me->mut);
while(me->token <= 0)
{
// 这个地方是查询法,存在忙等
// pthread_mutex_unlock(&me->mut);
// sched_yield(); // 调用线程主动让出CPU时间片,减少忙等带来的CPU资源浪费
// pthread_mutex_lock(&me->mut);

// 将查询忙等使用条件变量进行通知
// 如果没有条件变量通知,线程会阻塞在此处,挂起不会占用CPU很多资源
// 直到 thr_alarm 与 mytbf_returntoken 函数中使用pthread_cond_broadcast 通知条件变量
// 唤醒阻塞在这里的所有线程,并且抢锁,枷锁,查看token值
pthread_cond_wait(&me->cond,&me->mut);
}

// 取小值
n = min(me->token,size);
me->token -= n; // 令牌数量减少
// 解锁
pthread_mutex_unlock(&me->mut);

return n;
}


/*
* 将没有用完的令牌数量还到令牌桶中, int 第二个参数是想还回去的数量
* */
// 返回值,为真正还了多少令牌
int mytbf_returntoken(mytbf_t *ptr,int size)
{
struct mytbf_st *me = ptr;

if(size < 0)
// EINVAL(Invalid Argument)是另一个常见的错误代码,表示传递给系统调用的一个或多个参数是无效的
return -EINVAL; // 这样就可以使用strerror(errno)进行报错

// 访问该令牌桶中的令牌,但是可能有其他线程此时正准备归还令牌,因此需要保证线程同步
// 枷锁
pthread_mutex_lock(&me->mut);
me->token += size; // 归还了,当前令牌桶的令牌数量增加
// 但是还是要约束在上限内
if(me->token > me->burst)
me->token = me->burst;

// 这个时候也需要唤醒阻塞的线程,因为此时的token>0,也可以进行消费了
pthread_cond_broadcast(&me->cond);
pthread_mutex_unlock(&me->mut);

return size;
}


/*
* 销毁令牌桶
* */
int mytbf_destory(mytbf_t *ptr)
{
// 将传入的令牌桶销毁
struct mytbf_st *me = ptr;

// 将当前令牌桶占用的令牌桶数组位置赋值为NULL,共享资源,枷锁
pthread_mutex_lock(&mut_job);
job[me->pos] = NULL;
pthread_mutex_unlock(&mut_job);

// 销毁该令牌桶中有关令牌的互斥量
pthread_mutex_destroy(&me->mut);
// 条件变量也进行销毁
pthread_cond_destroy(&me->cond);
// 释放创建传入的令牌桶开辟的动态空间
free(ptr);

return 0;
}

main.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>

#include "mytbf.h"

#define CPS 10
#define BUFSIZE 1024
#define BURST 100


int main(int argc, char **argv)
{
int sfd, dfd = 1;
int len = 0;
int ret = 0;
int pos = 0;
int size = 0;
char buf[BUFSIZE];
mytbf_t *tbf;

if(argc < 2)
{
fprintf(stderr, "Usage:%s <src_file>\n",argv[0]);
exit(1);
}

tbf = mytbf_init(CPS, BURST);
if(tbf == NULL)
{
fprintf(stderr, "mytbf_init() failed!\n");
exit(1);
}

do
{
sfd = open(argv[1], O_RDONLY);
if(sfd < 0)
{
if(errno != EINTR)
{
perror("open()");
exit(1);
}
}
}while (sfd < 0);


while(1)
{
size = mytbf_fetchtoken(tbf, BUFSIZE);
if(size < 0)
{
fprintf(stderr, "mytbf_fetchtoken():%s\n", strerror(-size));
exit(1);
}

while((len = read(sfd, buf, size)) < 0)
{
if(errno == EINTR)
continue;
perror("read()");
break;
}

if(len == 0)
break;

if(size - len > 0)
mytbf_returntoken(tbf, size - len);

pos = 0;
while(len > 0)
{
// printf("pos=%d len=%d\n", pos, len);
ret = write(dfd, buf + pos, len);
// printf("ret=%d\n", ret);
if(ret < 0)
{
if(errno == EINTR)
continue;
perror("write()");
exit(1);
}
pos += ret;
len -= ret;
}

}

close(sfd);

mytbf_destory(tbf);

exit(0);
}

CMakeLists.txt

1
2
3
4
5
6
7
8
9
10
11
12
cmake_minimum_required(VERSION 3.19)
project(ThreadTokenBucket C)

set(CMAKE_C_STANDARD 99)

# 找线程库
find_package(Threads REQUIRED)

add_executable(ThreadCondTokenBucket main.c mytbf.c mytbf.h)

# 链接线程库
target_link_libraries(ThreadCondTokenBucket ${CMAKE_THREAD_LIBS_INIT})

编译运行,发现程序在以每间隔1s,以CPS字节数量的速度输出想查看文件内容

我是在虚拟机上面运行的程序,处理器数量为2,每个处理器的核心数为2,虚拟的CPU为2x2=4。使用编译的程序,显示一个内容比较多的文件时。使用htop在另外一个终端,查看ubuntu的cpu占用率,如下图:

image-20240509151036875

发现此时相比于之前的查询方法,使用信号变量的通知法,解决了忙等的问题,第二个CPU的占用率基本没有



8.条件变量

条件变量是多线程编程中用于线程同步的另一种机制。它允许线程阻塞等待,直到某个特定条件成为真。这是一种有用的机制,特别是在涉及生产者-消费者问题或其他需要多个线程协作的场景中

POSIX 线程(pthreads)中,条件变量的类型是 pthread_cond_t

(1)相关函数
pthread_cond_init()

pthread_cond_init函数用于初始化一个条件变量

1
int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);

参数:

  • cond: 指向要初始化的条件变量的指针。
  • attr: 指向条件变量属性对象的指针。如果你想使用默认的属性,可以设置为NULL。

返回值:

  • 0: 成功。
  • 非0: 错误码,表示发生了错误
pthread_cond_destroy()

pthread_cond_destroy函数用于销毁一个条件变量,并释放与之相关的任何资源。销毁一个已被其他线程阻塞(等待)的条件变量将导致未定义的行为,因此在销毁条件变量之前,确保没有线程正在等待该条件变量是很重要的

1
int pthread_cond_destroy(pthread_cond_t *cond);

参数:

  • cond: 指向要销毁的条件变量的指针。

返回值:

  • 0: 成功。
  • 非0: 错误码,表示发生了错误
pthread_cond_wait()

pthread_cond_wait函数在多线程编程中用于阻塞当前线程(将这个线程挂起),直到另一个线程发出信号通知这个条件变量(直到某个条件变量被特定地通知或广播),一旦条件变量被通知,线程被唤醒。通常与一个互斥量(mutex)一起使用,以确保数据的同步访问

线程被挂起,就会处于非运行状态,不会占用CPU资源

1
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);

参数:

  • cond: 指向条件变量的指针。
  • mutex: 在调用pthread_cond_wait之前必须被锁定的互斥量。pthread_cond_wait会自动释放此互斥量,允许其他线程操作,然后使当前线程进入等待状态。直到条件变量被其他线程发出信号或广播。一旦收到信号或广播,pthread_cond_wait再次锁定互斥量(再次将互斥锁上锁)并返回。

返回值:

  • 0: 成功。
  • 非0: 错误码
pthread_cond_signal()

pthread_cond_signal函数用于从一个或多个等待给定条件变量的线程中唤醒一个线程。它是条件变量操作的核心,使得多线程间可以进行精确的同步操作

1
int pthread_cond_signal(pthread_cond_t *cond);

参数:

  • cond: 指向条件变量的指针。

返回值:

  • 0: 成功。
  • 非0: 错误码
pthread_cond_broadcast()

pthread_cond_broadcast函数用于唤醒所有正在给定条件变量上等待的线程。与pthread_cond_signal不同,pthread_cond_signal只唤醒一个等待的线程,而pthread_cond_broadcast则唤醒所有等待的线程

1
int pthread_cond_broadcast(pthread_cond_t *cond);

参数:

  • cond: 指向条件变量的指针。

返回值:

  • 0: 成功。
  • 非0: 错误码

(2)程序实例

producer线程添加元素到缓冲区,而consumer线程从缓冲区中取出元素。条件变量确保当缓冲区满时,生产者会等待,而当缓冲区为空时,消费者会等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#include <stdio.h>
#include <pthread.h>

#define BUFFER_SIZE 10

static int buffer[BUFFER_SIZE];
static int count = 0;

// 初始化互斥量与条件变量
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

static void* producer(void* arg) {
for (int i = 0; i < 100; i++) {
pthread_mutex_lock(&mutex);
// 当生产车间buf满时,无法在进行生产,生产者等待,对互斥量进行解锁,阻塞当前线程
while (count == BUFFER_SIZE) {
pthread_cond_wait(&cond, &mutex);
}
buffer[count++] = i;
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
}

// 线程结束
pthread_exit(NULL);
}

static void* consumer(void* arg) {
for (int i = 0; i < 100; i++) {
pthread_mutex_lock(&mutex);
while (count == 0) {
pthread_cond_wait(&cond, &mutex);
}
printf("Consumer: %d\n", buffer[--count]);
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
}

// 线程结束
pthread_exit(NULL);
}

int main() {
pthread_t prod, cons;

// 创建线程
pthread_create(&prod, NULL, producer, NULL);
pthread_create(&cons, NULL, consumer, NULL);

// 回收线程资源
pthread_join(prod, NULL);
pthread_join(cons, NULL);
return 0;
}


9.线程-信号量

信号量(semaphores)是一个同步原语,经常用于限制对资源的并发访问(可以解决资源有上限的资源共享问题),或者作为多个线程或进程间的同步手段。在线程编程中,信号量可以用来解决读者-写者问题、生产者-消费者问题等

POSIX线程(pthreads)中,信号量由sem_t类型表示,并通过一系列的函数进行操作

  • sem_init() - 初始化一个未命名的信号量
1
int sem_init(sem_t *sem, int pshared, unsigned int value);
  • sem_wait() - 减少信号量的值。如果信号量的值大于0,则减少其值并立即返回。如果信号量的值为0,则线程会被阻塞,直到信号量值变为大于0
1
int sem_wait(sem_t *sem);
  • sem_trywait() - 尝试减少信号量的值。如果信号量的值为0,则立即返回错误,否则减少其值
1
int sem_trywait(sem_t *sem);
  • sem_post() - 增加信号量的值。增加后,如果有其他线程因为sem_wait被阻塞在这个信号量上,那么其中一个线程会被唤醒
1
int sem_post(sem_t *sem);
  • sem_destroy() - 销毁一个未命名的信号量,释放其相关资源
1
int sem_destroy(sem_t *sem);
  • 还有其他函数,如sem_getvalue()(获取当前信号量值)和命名信号量相关的函数(如sem_open(), sem_close(), sem_unlink()等)

(1)相关函数
sem_init()

初始化一个未命名的信号量

1
int sem_init(sem_t *sem, int pshared, unsigned int value);

参数:

  • sem:这是指向要初始化的信号量对象的指针
  • pshared:这个参数决定了信号量是在同一进程的线程间共享(设置为0),还是在多个进程间共享(设置为非0值)。大多数情况下,我们只在一个进程的线程之间使用信号量,因此将其设置为0
  • value:这是信号量的初始值。例如,如果你想使用信号量作为一个互斥锁,你可以将初始值设置为1

返回值:

  • 0:表示成功。
  • -1:表示出错,此时错误码存放在errno
sem_wait()

用于降低信号量的值。如果信号量的当前值大于0,它会被减少(通常是减少1)并且函数立即返回。如果信号量的当前值为0,调用此函数的线程将被阻塞,直到信号量值变为大于0

1
int sem_wait(sem_t *sem);

参数:

  • sem:指向信号量对象的指针。

返回值:

  • 0:表示成功。
  • -1:表示出错,此时错误码存放在errno
sem_trywait()

这个函数尝试降低信号量的值,但与 sem_wait() 不同的是,如果信号量当前的值为0,它不会阻塞调用线程,而是立即返回一个错误

1
int sem_trywait(sem_t *sem);

参数:

  • sem:指向信号量对象的指针。

返回值:

  • 0:表示成功,信号量的值被成功减少。
  • -1:表示无法降低信号量的值(例如,当它已经为0时)或其他错误发生。此时,错误码存放在errno中。常见的错误码是EAGAIN,表示信号量的值当前为0
sem_post()

它的作用是增加信号量的值。当信号量的值增加时,如果有线程因调用 sem_wait()sem_trywait() 而阻塞在这个信号量上,那么其中一个线程将被唤醒并继续执行

1
int sem_post(sem_t *sem);

参数:

  • sem:指向信号量对象的指针。

返回值:

  • 0:表示成功。
  • -1:表示出错,此时错误码存放在 errno

(2)实例程序
总资源数为1

如果生产者和消费者线程使用的信号量对应的总资源数为1,那么不管线程有多少个,可以工作的线程只有一个,其余线程由于拿不到资源,都被迫阻塞了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <semaphore.h>
#include <pthread.h>

// 链表的节点
struct Node
{
int number;
struct Node* next;
};

// 生产者线程信号量
sem_t psem;
// 消费者线程信号量
sem_t csem;

// 互斥锁变量
pthread_mutex_t mutex;
// 指向头结点的指针

main.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
//信号量头文件
#include <semaphore.h>

/*
使用信号量
编写生产者和消费者模型
总资源数为1的情况
*/

//生产者的信号量
sem_t semp;
//消费者信号量
sem_t semc;

//创建一个互斥锁
pthread_mutex_t mutex;

//链表节点类型
struct Node
{
int number;
struct Node* next;
};

//头节点
struct Node *head = NULL;

//生产者回调函数
void *producer(void *arg)
{
while (1)
{
//检测生产者是否有资源
sem_wait(&semp);//资源>0就不会被阻塞---生产者线程会占用一个资源-1

//创建新节点
struct Node *newnode = malloc(sizeof(struct Node));
//初始化节点
newnode->number = rand() % 1000;
newnode->next = head;
head = newnode;
printf("生产者,id:%ld , number:%d\n", (pthread_self() % 100), newnode->number);
//通知消费者消费---消费者资源+1
sem_post(&semc);
sleep(rand() % 3);//休眠控制生产速度
}
return NULL;
}

//消费者回调函数
void *consumer(void *arg)
{
while (1)
{
//消费者尝试消费,若有资源则消费者资源-1
sem_wait(&semc);

struct Node *node = head;
printf("消费者,id:%ld , number:%d\n", (pthread_self() % 100), node->number);
head = head->next;
free(node);
//通知生产者生产
sem_post(&semp);
sleep(rand() % 3);
}
return NULL;
}


int main()
{
//初始化互斥锁
pthread_mutex_init(&mutex, NULL);
//初始化信号变量
//1.生产者
sem_init(&semp, 0, 1);//生产者总共的资源数为1--->则可同时工作的线程为1个
//2.消费者 -> 资源初始化为0 消费则线程启动就阻塞
sem_init(&semc, 0, 0);


//创建生产者消费者线程ID
pthread_t t1[5], t2[5];
//创建生产者线程
for (int i = 0; i < 5; i++)
{
pthread_create(&t1[i], NULL, producer, NULL);
}
//创建消费者线程
for (int i = 0; i < 5; i++)
{
pthread_create(&t2[i], NULL, consumer, NULL);
}

//线程资源回收
for (int i = 0; i < 5; i++)
{
pthread_join(t1[i], NULL);
pthread_join(t2[i], NULL);
}

//销毁资源释放
pthread_mutex_destroy(&mutex);
//信号量的销毁
sem_destroy(&semp);
sem_destroy(&semc);

return 0;
}

通过测试代码可以得到如下结论:如果生产者和消费者使用的信号量总资源数为1,那么不会出现生产者线程和消费者线程同时访问共享资源的情况,不管生产者和消费者线程有多少个,它们都是顺序执行的。

运行结果

可以看到生产者与消费者交替运行

image-20231219222122218

总资源数大于1

如果生产者和消费者线程使用的信号量对应的总资源数为大于1,这种场景下出现的情况就比较多了:

  • 多个生产者线程同时生产
  • 多个消费者同时消费
  • 生产者线程和消费者线程同时生产和消费

以上不管哪一种情况都可能会出现多个线程访问共享资源的情况,如果想防止共享资源出现数据混乱,那么就需要使用互斥锁进行线程同步,线程线性执行任务 ,处理代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <semaphore.h>
#include <pthread.h>

// 链表的节点
struct Node
{
int number;
struct Node* next;
};

// 生产者线程信号量
sem_t psem;
// 消费者线程信号量
sem_t csem;

// 互斥锁变量
pthread_mutex_t mutex;
// 指向头结点的指针

在编写上述代码的时候还有一个需要注意是事项,不管是消费者线程的处理函数还是生产者线程的处理函数内部有这么两行代码:

1
2
3
4
5
6
7
// 消费者
sem_wait(&csem);
pthread_mutex_lock(&mutex);

// 生产者
sem_wait(&csem);
pthread_mutex_lock(&mutex);

这两行代码的调用顺序是不能颠倒的,如果颠倒过来就有可能会造成死锁,下面来分析一种死锁的场景:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void* producer(void* arg)
{
// 一直生产
while(1)
{
pthread_mutex_lock(&mutex);
// 生产者拿一个信号灯
sem_wait(&psem);
......
......
// 通知消费者消费
sem_post(&csem);
pthread_mutex_unlock(&mutex);

// 生产慢一点
sleep(rand() % 3);
}
return NULL;
}

// 消费者的回调函数
void* consumer(void* arg)

在上面的代码中,初始化状态下消费者线程没有任务信号量资源,假设某一个消费者线程先运行,调用pthread_mutex_lock(&mutex);对互斥锁加锁成功,然后调用sem_wait(&csem);由于没有资源,因此被阻塞了。其余的消费者线程由于没有抢到互斥锁,因此被阻塞在互斥锁上。对应生产者线程第一步操作也是调用pthread_mutex_lock(&mutex);,但是这时候互斥锁已经被消费者线程锁上了,所有生产者都被阻塞,到此为止,多余的线程都被阻塞了,程序产生了死锁。

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
//信号量头文件
#include <semaphore.h>

/*
使用信号量
编写生产者和消费者模型
总资源数为5的情况--->需要线程同步
*/

//生产者的信号量
sem_t semp;
//消费者信号量
sem_t semc;

//创建一个互斥锁
pthread_mutex_t mutex;

//链表节点类型
struct Node
{
int number;
struct Node* next;
};

//头节点
struct Node *head = NULL;

//生产者回调函数
void *producer(void *arg)
{
while (1)
{
//检测生产者是否有资源---是否有空闲的生产者线程
sem_wait(&semp);//资源>0就不会被阻塞---生产者线程会占用一个资源-1
//上锁
pthread_mutex_lock(&mutex);
//上锁和sem_wait()两句代码顺序不能乱


//创建新节点
struct Node *newnode = malloc(sizeof(struct Node));
//初始化节点
newnode->number = rand() % 1000;
newnode->next = head;
head = newnode;
printf("生产者,id:%ld , number:%d\n", (pthread_self() % 100), newnode->number);
//解锁
pthread_mutex_unlock(&mutex);
//通知消费者消费---消费者资源+1
sem_post(&semc);
sleep(rand() % 3);//休眠控制生产速度
}
pthread_exit(NULL);
}

//消费者回调函数
void *consumer(void *arg)
{
while (1)
{
//消费者尝试消费,若有资源则消费者资源-1(任务队列中是否有多余的任务)
sem_wait(&semc);
//上锁
pthread_mutex_lock(&mutex);
struct Node *node = head;
printf("消费者,id:%ld , number:%d\n", (pthread_self() % 100), node->number);
head = head->next;
free(node);
//解锁
pthread_mutex_unlock(&mutex);
//通知生产者生产
sem_post(&semp);
sleep(rand() % 3);
}
pthread_exit(NULL);
}


int main()
{
//初始化互斥锁
pthread_mutex_init(&mutex, NULL);
//初始化信号变量
//1.生产者
sem_init(&semp, 0, 5);//生产者总共的资源数为5--->为了不造成数据的混乱还是需要线程同步--->需要互斥锁的帮助(锁在临界区)
//2.消费者 -> 资源初始化为0 消费则线程启动就阻塞
sem_init(&semc, 0, 0);


//创建生产者消费者线程ID
pthread_t t1[5], t2[5];
//创建生产者线程
for (int i = 0; i < 5; i++)
{
pthread_create(&t1[i], NULL, producer, NULL);
}
//创建消费者线程
for (int i = 0; i < 5; i++)
{
pthread_create(&t2[i], NULL, consumer, NULL);
}

//线程资源回收
for (int i = 0; i < 5; i++)
{
pthread_join(t1[i], NULL);
pthread_join(t2[i], NULL);
}

//销毁资源释放
pthread_mutex_destroy(&mutex);
//信号量的销毁
sem_destroy(&semp);
sem_destroy(&semc);

return 0;
}