UNIX环境编程-并发(11) 一、信号 1.信号的概念 信号是软件层面的中断,信号的响应依赖于中断
(signal
)是一种用于异步通知进程某个事件已经发生的机制。当进程收到一个信号时,它可以有几种不同的反应,包括忽略该信号、执行默认的信号处理动作,或者调用一个用户定义的处理函数
(1)同步与异步 同步
在同步操作中,一个任务必须等待前一个任务完成后才能继续执行。这意味着您在等待任务完成时不能做任何其他事情
特点
执行顺序是可预测的。
容易编写和理解,因为操作按预期的顺序执行。
可能会浪费CPU
时间,因为它在等待外部资源(如磁盘IO
、网络请求等)时可能会处于闲置状态
异步 在异步操作中,你可以在等待某个任务完成时继续执行其他任务。当那个任务完成后,系统通常会使用回调、事件或其他机制来通知你
特点
执行顺序不是固定的,可能需要额外的工具或方法(如回调、promises、futures等)来处理。
可以更充分地利用CPU
,因为它可以在等待外部资源时执行其他任务。
可能会更难编写和理解,特别是当涉及到大量的异步操作和回调时
2.signal() 在ubuntu
终端中输入kill -l
可以查看各类信号
(1)函数原型 1 2 3 4 5 6 7 #include <signal.h> typedef void (*sighandler_t ) (int ) ;sighandler_t signal (int signum, sighandler_t handler) ;
signal()
函数用于处理信号。这是一个基本的信号处理接口,它允许程序员为特定的信号定义一个处理函数
参数
signum
是要捕获或忽略的信号。
handler
是当指定的信号发生时要调用的函数的指针(函数指针,回调函数),或者是以下特定的值:
SIG_IGN
:忽略该信号。
SIG_DFL
:采取信号的默认动作。
函数返回值是之前关联到指定信号的处理函数的地址,或者是SIG_IGN
、SIG_DFL
。如果出错,则返回 SIG_ERR
(2)程序实例 将文件描述符1号位置 每秒输入一个字符 * 即标准输出中每秒打印一个*号
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 #include <stdio.h> #include <stdlib.h> #include <signal.h> #include <unistd.h> int main () { int i; for (i = 0 ;i<10 ;i++) { write(1 ,"*" ,1 ); sleep(1 ); } exit (0 ); }
若此时在终端中键入ctrl + c
则会打断打印输出,即信号SIGINT
SIGINT
:中断信号(终端终端符)。当用户从键盘按下Ctrl+C
时发送。通常用于中断进程
SIGQUIT
:退出信号。和SIGINT
类似,但会导致进程核心转储,Ctrl+\
忽略SIGINT
信号 在程序中使用signal
函数,忽略SIGINT
信号,使得ctrl + c
无法打断其输出打印
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 #include <stdio.h> #include <stdlib.h> #include <signal.h> #include <unistd.h> int main () { int i; signal(SIGINT,SIG_IGN); for (i = 0 ;i<10 ;i++) { write(1 ,"*" ,1 ); sleep(1 ); } exit (0 ); }
运行结果
捕获SIGINT
信号,执行其他任务 当程序捕获SIGINT
信号时,则在终端打印输出一个!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 #include <stdio.h> #include <stdlib.h> #include <signal.h> #include <unistd.h> static void int_handler (int s) { write(1 ,"!" ,1 ); } int main () { int i; signal(SIGINT,int_handler); for (i = 0 ;i<10 ;i++) { write(1 ,"*" ,1 ); sleep(1 ); } exit (0 ); }
当程序执行过程中,若一直键入ctrl + c
,程序不会执行10s,一瞬间会输出完,出现如下情况
重点:信号会打断阻塞的系统调用 上述的sleep函数就是被打断的系统调用
当一个进程在执行某个阻塞的系统调用(如read()
, write()
, accept()
, sleep()
等)时接收到某些信号,并且为该信号设置了处理函数,则该系统调用可能会被中断并提前返回。这是因为信号处理函数被执行,系统调用返回错误,并且errno
被设置为EINTR
,表示系统调用被一个信号中断
例如,考虑一个进程正在执行一个阻塞的read()
调用,等待从文件或套接字中读取数据。如果在这期间该进程接收到一个信号,并且已经为该信号注册了处理函数,那么read()
可能会返回-1
,并且errno
会被设置为EINTR
。
为了处理这种情况,通常的策略是在检测到系统调用因为EINTR
而失败后重新执行该系统调用。这样可以确保系统调用能够完成其原始的任务。
这是一个简单的处理read()
被信号中断的例子 :
1 2 3 4 5 6 7 8 9 10 11 12 ssize_t ret;do { ret = read(fd, buffer, sizeof (buffer)); if (ret < 0 ) if (errno != EINTR) { perror("read()" ); exit (1 ); } } while (ret < 0 );
为了避免某些系统调用被中断的行为,你可以使用sigaction()
代替signal()
来注册信号处理器,并设置SA_RESTART
标志。这会使得大多数被此信号中断的系统调用自动重新启动
3.信号的不可靠 信号的行为不可靠
信号机制在早期的UNIX系统中被认为是不可靠的,主要是因为以下原因:
信号丢失 :在早期的实现中,如果一个信号在其处理程序执行时再次被发送,那么这个信号可能会被忽略。也就是说,如果两次连续发送同一个信号,而在第一个信号的处理程序还没有执行完毕时,第二个信号就到达了,那么第二个信号可能不会被注意到。
信号不排队 :对于大多数信号来说,不论发送了多少次同样的信号,如果之前的信号尚未处理,则只会保留一次。也就是说,如果你发送了三次SIGUSR1
信号,但在处理第一个信号之前,其他两个信号就到达了,那么这两个信号可能被视为一个。
默认行为和信号处理的改变 :当一个进程接收到一个它没有为其注册处理函数的信号时,它会执行该信号的默认行为(如终止)。但在早期的signal()
实现中,当一个信号处理程序被调用时,该信号的行为会被重置为默认行为。这意味着,如果在处理某个信号时再次接收到同样的信号,那么可能会执行该信号的默认行为,而不是再次调用信号处理程序。
非原子性 :在信号处理函数和主程序之间,存在竞态条件,这可能导致不确定的行为。
由于上述原因,程序员必须在使用信号时特别小心。为了解决这些问题,后来的UNIX
系统和POSIX
标准引入了更为可靠的信号处理机制,如sigaction()
函数和sigprocmask()
函数等,它们提供了更加详细和可控的信号处理能力。
在现代的UNIX-like
系统中,尽管许多早期的问题已经被解决,但仍然建议在设计信号处理时采取谨慎的态度,并充分了解特定平台上的信号语义和行为
4.可重入函数 重入:第一次调用没有结束,第二次调用就开始了,但是不会出错
在计算机编程中,特别是在多线程和信号处理的上下文中,”可重入”是一个重要的概念。一个函数如果是可重入的,那么它可以在它自身还未完成之前被安全地再次调用 。这意味着多个线程可以同时或者在中断上下文中调用这样的函数,而不必担心会引发竞态条件或其他未定义的行为
所有的系统调用均为可重入的 ,一部分标准的库函数也是可重入的,如:memcpy()
以下是使函数可重入的一些关键属性:
不使用全局和静态变量 :全局和静态变量在函数的多次调用之间是持久的,所以如果一个函数修改了这样的变量,那么其它的调用可能会看到不一致的状态。
不修改其参数 :函数不应该修改其传入的参数,除非这些修改是调用者所期望的。
不调用其他非可重入的函数 :这是很直观的,因为如果一个可重入的函数调用了一个非可重入的函数,那么整个调用链就不能被视为是可重入的。
使用局部变量代替其他形式的存储 :局部变量在栈上分配,所以每次函数调用都会有其自己的一套局部变量。
线程安全 :虽然可重入性和线程安全性并不完全相同,但可重入函数在多线程环境中也是线程安全的。
在信号处理中,可重入性尤为重要,因为你无法预测何时会接收到一个信号。如果一个信号处理函数调用了一个非可重入的函数(例如许多标准库函数),那么可能会出现竞态条件,导致不确定的行为
例如,malloc()
和 free()
这样的内存管理函数在多线程或信号处理上下文中通常不是可重入的,因为它们维护了一个全局的内存分配表。因此,如果在信号处理器中使用这些函数可能是危险的
5.信号的响应过程 视频教程
信号从收到至响应有一个不可以避免的延迟
思考:如何忽略掉一个信号?
思考:标准信号为什么丢失?
标准信号的响应没有严格的顺序
6.常用函数 (1)kill() kill
函数是用来发送信号给进程的。这个函数定义在<signal.h>
头文件中
函数原型
1 2 3 #include <signal.h> int kill (pid_t pid, int sig) ;
参数
pid
:这是要接收信号的进程的进程ID。
如果 pid > 0
,则信号发送给该pid
对应的进程。
如果 pid == 0
,则信号发送给当前进程和同一个进程组的所有进程(组内广播)。
如果 pid == -1
,则信号发送给除了init
进程(其PID
通常为1
)之外的所有进程(全局广播)。调用进程必须有相应的权限。
如果 pid
< -1,则信号发送给进程组ID为-pid
的所有进程。
sig
:要发送的信号编号。例如,SIGKILL
,SIGTERM
等。发送0信号可以用来检查一个进程是否存在(它不会导致进程终止或接收任何实际的信号
返回值
成功:返回0
失败:返回-1
,并设置errno
来表示错误原因
注意 :
检测进程存在性 :当 sig
为 0 时,kill()
实际上并不发送任何信号。它只是用于检测指定 pid
的进程或进程组是否存在,以及调用进程是否有权限向其发送信号
程序实例
要终止PID
为12345
的进程,你可以这样做:
1 2 3 4 5 #include <signal.h> kill(12345 , SIGKILL);
杀死一个进程通常应该更为优雅。许多程序首先发送SIGTERM
信号来请求进程终止,给它机会进行清理工作。如果那不起作用,然后再发送SIGKILL
注意:直接使用kill
需要谨慎,因为不当地发送信号可能导致不希望的副作用,比如数据丢失或进程不正常终止
(2)raise() raise
函数用于发送一个信号给当前执行的进程。它也定义在<signal.h>
头文件中
函数原型
1 2 3 #include <signal.h> int raise (int sig) ;
参数
sig
:要发送的信号编号。例如,SIGABRT
,SIGINT
,SIGTERM
等
返回值
程序实例
你可能在一个程序中使用raise
来生成一个中断信号:
1 2 3 4 5 6 7 8 9 #include <signal.h> #include <stdio.h> int main () { printf ("Raising the interrupt signal...\n" ); raise(SIGINT); printf ("After raise.\n" ); return 0 ; }
在这个例子中,程序会发送一个SIGINT
信号给自己,模拟像通过键盘发送的Ctrl+C
命令。这通常会导致进程终止,除非你有一个针对这个信号的处理函数。
使用raise
函数可以在程序中触发信号处理程序,或者模拟外部条件导致的信号
(3)alarm() 这个函数用于为调用进程设置一个实时闹钟,当这个闹钟超时时,系统会向该进程发送一个SIGALRM
信号。它定义在<unistd.h>
头文件中
函数原型
1 2 3 #include <unistd.h> unsigned int alarm (unsigned int seconds) ;
参数
seconds
:设置的超时时间,单位为秒。当设置为0
时,任何之前设置的闹钟都会被取消
返回值
如果之前已经设置了一个闹钟,且它尚未超时,那么alarm
函数会返回之前那个闹钟的剩余时间(秒)。否则,它返回0。
当alarm
设置的时间到达时,操作系统会向进程发送SIGALRM
信号。默认情况下,这会终止进程。但你可以使用signal
或sigaction
函数来定义自己的处理函数,从而在收到SIGALRM
信号时执行特定的操作
SIGALRM
默认行为:当进程接收到SIGALRM
信号且没有为它设置处理程序时,默认的行为是终止进程
SIGALRM
自定义处理 :你可以使用signal
或sigaction
函数为SIGALRM
定义自己的处理程序,这样你就可以在接收到信号时执行特定的操作,而不是让进程终止
实例程序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 #include <stdio.h> #include <unistd.h> #include <signal.h> void handle_alarm (int sig) { printf ("Alarm went off!\n" ); } int main () { signal(SIGALRM, handle_alarm); printf ("Setting an alarm for 5 seconds...\n" ); alarm(5 ); pause(); printf ("Exiting program.\n" ); return 0 ; }
程序设置了一个5秒的闹钟,并使用pause
函数等待信号的到来。当闹钟超时,会执行handle_alarm
函数,打印出“Alarm went off!”。然后程序继续执行并退出
注意:alarm
函数只能设置一个闹钟。如果在前一个闹钟超时前再次调用它,前一个闹钟会被新的值替代
应用场景
定时操作 :你可以使用alarm
和SIGALRM
来在程序中执行定时任务。例如,如果你的程序在网络上等待数据,但你不希望它永远等待,你可以设置一个闹钟作为超时机制。
资源限制 :例如,当运行某些计算密集型任务时,你可能希望限制它们的执行时间
注意事项
因为alarm
只能设置一个闹钟,所以每次调用alarm
都会重置之前的闹钟。
信号机制不是实时的,所以当你设置一个很短的闹钟时,不一定能够精确地在你期望的时间内触发。
在信号处理程序中,你应该避免调用可能会改变程序状态或不是异步信号安全的函数
定时循环程序程序实例 使用time()函数进行定时
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 #include <stdio.h> #include <unistd.h> #include <signal.h> #include <time.h> #include <stdint.h> int main () { time_t end; int64_t count = 0 ; end = time(NULL ) + 5 ; while (time(NULL ) <= end) { count++; } printf ("%ld\n" ,count); exit (0 ); }
该程序运行5s
后,在终端打印输出,在该目录下新建out
文件,将程序实际运行程序重定向输入至out
中
运行结果,实际运行时间比5s
多一点,存在误差
1 2 3 4 5 (base) zxz@zxz-B660M-GAMING-X-AX-DDR4:~/Proj/CLionProj/UNIX/IO/signal$ time ./5 >> ./out real 0 m5.317 s user 0 m5.309 s sys 0 m0.008 s
使用alarm()
函数进行定时
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 #include <stdio.h> #include <stdlib.h> #include <signal.h> #include <stdint.h> #include <unistd.h> static int loop = 1 ;static void alrm_handler (int s) { loop = 0 ; } int main () { int64_t count = 0 ; signal(SIGALRM,alrm_handler); alarm(5 ); while (loop) count++; printf ("%ld\n" ,count); exit (0 ); }
在终端中使用指令time ./6 >> ./out
运行程序,程序打印结果重定向至out
文件中
程序结果
1 2 3 4 5 (base) zxz@zxz-B660M-GAMING-X-AX-DDR4:~/Proj/CLionProj/UNIX/IO/signal$ time ./6 >> ./out real 0 m5.002 s user 0 m4.998 s sys 0 m0.004 s
使用alarm()信号的方式去定时,相较于time()精度更高
(4)pause() pause
函数被用于使进程暂停执行,直到它接收到一个信号。一旦进程接收到信号,pause
函数返回,然后进程可以继续执行。这个函数定义在<unistd.h>
头文件中
pause
函数会使调用它的进程休眠,直到该进程捕获到一个信号。无论信号是否被捕获或忽略,pause
都会返回
函数原型
1 2 3 #include <unistd.h> int pause (void ) ;
返回值
pause
函数永远不会成功返回。如果被中断,它将返回-1
,并设置errno
为EINTR
pause
函数在某些场景下是有用的,尤其是当你希望进程在等待外部事件(通常是信号)发生时保持休眠
(5)abort() abort
函数是一个标准库函数,位于头文件 <stdlib.h>
中。它的主要功能是用于终止程序的执行。当调用abort
函数时,程序会立即停止运行,并生成一个终止信号,通常会导致程序的终止
abort
函数通常在以下情况下被调用:
(6)system() system
函数是一个标准库函数,位于头文件 <stdlib.h>
中。它用于在程序中执行系统命令或外部程序,并等待命令执行完毕后继续程序的执行
system
函数的原型如下:
1 int system (const char *command) ;
参数 command
是一个字符串,表示要执行的系统命令或外部程序。当调用system
函数时,它会启动一个新的进程来执行指定的命令,然后等待命令执行完毕。函数的返回值表示命令执行的状态:
如果命令执行成功,system
函数返回一个正整数值(通常是0
)。
如果命令执行失败,或者无法执行命令,system
函数返回一个非零值。
程序实例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 #include <stdio.h> #include <stdlib.h> int main () { int commandResult; commandResult = system("ls -l" ); if (commandResult == 0 ) { printf ("Command executed successfully.\n" ); } else { printf ("Command execution failed.\n" ); } return 0 ; }
在上述示例中,system("ls -l")
执行了一个简单的 ls -l
命令,该命令会列出当前目录的文件和文件夹的详细信息。程序根据system
函数的返回值判断命令是否执行成功
(7)sleep() sleep
函数是一个标准库函数,位于头文件 <unistd.h>
中。它的作用是让程序在指定的时间内暂停(休眠)执行,以达到一定的延迟效果。sleep
函数接受一个以秒为单位的整数参数,表示程序需要休眠的时间
sleep
函数的原型如下:
1 unsigned int sleep (unsigned int seconds) ;
参数 seconds
是一个无符号整数,表示程序需要休眠的秒数。函数会使程序休眠指定的秒数,然后继续执行
需要注意的是,sleep
函数的精度是以秒为单位,如果需要更精细的延迟,可以使用其他系统调用,如 usleep
或 nanosleep
,它们允许以微秒和纳秒为单位进行延迟
此外,sleep
函数通常在单线程环境下使用,如果在多线程程序中使用,它可能会导致整个程序休眠 ,而不仅仅是调用线程。在多线程环境中,可以考虑使用线程专用的延迟函数来避免这种情况
(8)nanosleep() naaosleep()
提供比传统的 sleep
函数更高的时间分辨率,允许使用纳秒级别的精度来指定睡眠时间
1 2 3 #include <time.h> int nanosleep (const struct timespec *req, struct timespec *rem) ;
参数:
req
:指向 timespec
结构的指针,该结构指定了要暂停的时间长度。timespec
结构如下定义:
1 2 3 4 struct timespec { time_t tv_sec; long tv_nsec; };
rem
:如果非 NULL
,则在函数返回时,此结构会被设置为未睡完的剩余时间。这通常在 nanosleep
被信号打断时发生,允许程序决定是否重新开始剩余的睡眠时间
返回值:
成功 :返回 0
。
错误 :返回 -1
并设置 errno
来指示错误类型
实例 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 #include <stdio.h> #include <time.h> int main () { struct timespec ts ; ts.tv_sec = 2 ; ts.tv_nsec = 500000000L ; if (nanosleep(&ts, NULL ) < 0 ) { perror("nanosleep" ); } else { printf ("Slept for 2.5 seconds\n" ); } return 0 ; }
(9)usleep() 允许程序以微秒(百万分之一秒)的单位进行睡眠。然而,需要注意的是,usleep
函数是一个较老的函数,并且在最新的 POSIX 标准中已经被废弃,推荐使用更现代的 nanosleep
函数或其他高精度定时函数来替代它
1 2 3 #include <unistd.h> int usleep (useconds_t usec) ;
参数:
usec
:要暂停的时间,以微秒为单位。尽管参数类型是 useconds_t
,这通常是一个无符号整数类型
行为与限制:
usleep
可以暂停程序的执行达到指定的微秒数。但其精确度和实际暂停时间可能受到系统调度策略和系统负载的影响。
在一些系统实现中,usleep
可以接受最大值为 1,000,000 微秒(即1秒)。超过这个值的输入可能导致函数行为不正常。
usleep
的一个主要限制是它不能处理被信号中断的情况。如果 usleep
在睡眠期间被信号打断,它不会重新进入睡眠状态
7.令牌桶算法 (1) 模拟cp指令 模拟cp指令,将src文件,复制到dest文件中
mycopy1.c 通过fopen函数返回文件流的方式,去读写文件数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 #include <stdlib.h> #include <stdio.h> int main (int argc,char *argv[]) { FILE *fps,*fpd; int ch; fps = fopen(argv[1 ],"r" ); if (fps == NULL ) { perror("fopen()" ); exit (1 ); } fpd = fopen(argv[2 ],"w" ); if (fpd == NULL ) { perror("fopen()" ); exit (1 ); } while (1 ) { ch = fgetc(fps); if (ch == EOF) { break ; } fputc(ch,fpd); } fclose(fps); fclose(fpd); exit (0 ); }
创建src文件,并且写入内容,运行指令:
使用指令diff,判断两个文件的内容是否一致,如下,若终端无内容输出,则表示内容一致
mycopy2.c 通过open函数,获取文件描述符,通过文件描述符号,读写文件的数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 #include <stdlib.h> #include <stdio.h> #include <fcntl.h> #include <unistd.h> #define BUFSIZE 1024 int main (int argc,char *argv[]) { int sfd,dfd; char buf[BUFSIZE]; int len,ret; if (argc < 3 ) { fprintf (stderr ,"Usage...\n" ); exit (1 ); } sfd = open(argv[1 ],O_RDONLY); if (sfd==-1 ) { perror("open()" ); exit (1 ); } dfd = open(argv[2 ],O_WRONLY|O_CREAT|O_TRUNC); if (dfd == -1 ) { perror("open()" ); close(sfd); exit (1 ); } while (1 ) { len = read(sfd,buf,BUFSIZE); if (len == -1 ) { perror("read()" ); exit (1 ); } if (len == 0 ) { break ; } ret = write(dfd,buf,len); if (ret == -1 ) { perror("write()" ); break ; } } close(sfd); close(dfd); exit (0 ); }
编译运行,发现存在bug
BUG : mycopy2.c
上面的程序存在bug,因为在向目标文件写入数据的时候,write函数可能会出错,假设我们需要写入10个字节,但是实际只写入了3个字节,那么ret返回值==3,也是非负值,此时并没有写入全部数据数据,下一次还会继续进行写入,但是第二次写入会将第一次写入的数据进行截断覆盖,因此做以下修正
mycopy3.c 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 #include <stdio.h> #include <stdlib.h> #include <fcntl.h> #include <unistd.h> #define BUFSIZE 1024 int main (int argc,char *argv[]) { int sfd,dfd; char buf[BUFSIZE]; int len,ret,pos; if (argc < 3 ) { fprintf (stderr ,"Usage...\n" ); exit (1 ); } sfd = open(argv[1 ],O_RDONLY); if (sfd==-1 ) { perror("open()" ); exit (1 ); } dfd = open(argv[2 ],O_WRONLY|O_CREAT|O_TRUNC); if (dfd == -1 ) { perror("open()" ); close(sfd); exit (1 ); } while (1 ) { len = read(sfd,buf,BUFSIZE); if (len == -1 ) { perror("read()" ); exit (1 ); } if (len == 0 ) { break ; } pos = 0 ; while (len > 0 ) { ret = write(dfd,buf+pos,len); if (ret == -1 ) { perror("write()" ); exit (1 ); } pos += ret; len -= ret; } } close(sfd); close(dfd); exit (0 ); }
mycopy4.c 因为write与read,open函数都是系统调用,存在假错的情况,因此需要对假错的情况进行判断
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 #include <stdio.h> #include <stdlib.h> #include <fcntl.h> #include <unistd.h> #include <errno.h> #define BUFSIZE 1024 int main (int argc,char *argv[]) { int sfd,dfd; char buf[BUFSIZE]; int len,ret,pos; if (argc < 3 ) { fprintf (stderr ,"Usage...\n" ); exit (1 ); } sfd = open(argv[1 ],O_RDONLY); do { if (sfd<0 ) { if (errno != EINTR) { perror("open()" ); exit (1 ); } } }while (sfd < 0 ); dfd = open(argv[2 ],O_WRONLY|O_CREAT|O_TRUNC); do { if (dfd < -1 ) { if (errno != EINTR) { perror("open()" ); close(sfd); exit (1 ); } } }while (dfd < 0 ); while (1 ) { while ((len = read(sfd,buf,BUFSIZE)) < 0 ) { if (errno == EINTR) continue ; perror("read()" ); exit (1 ); } if (len == 0 ) break ; pos = 0 ; while (len > 0 ) { ret = write(dfd,buf+pos,len); if (ret < 0 ) { if (errno == EINTR) continue ; perror("write()" ); exit (1 ); } pos += ret; len -= ret; } } close(sfd); close(dfd); exit (0 ); }
(2) 模拟cat指令 在mycopy4.c的基础上进行修改,只需要将目标文件描述符,改为标准输出即可,这样冲源文件中读取的数据,就会直接输出到终端上,标准输出的文件描述符为1
mycat1.c 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 #include <stdio.h> #include <stdlib.h> #include <fcntl.h> #include <unistd.h> #include <errno.h> #define BUFSIZE 1024 int main (int argc,char *argv[]) { int sfd,dfd = 1 ; char buf[BUFSIZE]; int len,ret,pos; if (argc < 2 ) { fprintf (stderr ,"Usage...\n" ); exit (1 ); } sfd = open(argv[1 ],O_RDONLY); do { if (sfd<0 ) { if (errno != EINTR) { perror("open()" ); exit (1 ); } } }while (sfd < 0 ); while (1 ) { while ((len = read(sfd,buf,BUFSIZE)) < 0 ) { if (errno == EINTR) continue ; perror("read()" ); exit (1 ); } if (len == 0 ) break ; pos = 0 ; while (len > 0 ) { ret = write(dfd,buf+pos,len); if (ret < 0 ) { if (errno == EINTR) continue ; perror("write()" ); exit (1 ); } pos += ret; len -= ret; } } close(sfd); exit (0 ); }
编译运行,如下:
mycat2.c–漏桶实现 在mycat1.c
中是将源文件的内容,一瞬间,全部输出到终端上进行显示,但是如果需要对输出数据速度进行控制呢,将内容每秒10字节的输出,就可以进行流控(流量控制),slowcat.c
漏桶 (Leaky Bucket)是一种常见的流量控制算法,用于平滑和限制进入系统的数据流量。其基本思想是:无论流量的峰值如何,都以固定的速率将数据发送到目的地,当数据流过快时,将溢出到“桶”中,而当数据发送速率过快时,将丢弃溢出的数据
通过进程中信号的机制,使用alarm()
函数进程定时
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 #include <stdio.h> #include <stdlib.h> #include <fcntl.h> #include <unistd.h> #include <errno.h> #include <signal.h> #define CPS 10 #define BUFSIZE CPS static volatile int loop = 0 ; static void alarm_handler (int s) { alarm(1 ); loop = 1 ; } int main (int argc,char *argv[]) { int sfd,dfd = 1 ; char buf[BUFSIZE]; int len = 0 ; int ret = 0 ; int pos = 0 ; if (argc < 2 ) { fprintf (stderr ,"Usage...\n" ); exit (1 ); } signal(SIGALRM,alarm_handler); alarm(1 ); sfd = open(argv[1 ],O_RDONLY); do { if (sfd<0 ) { if (errno != EINTR) { perror("open()" ); exit (1 ); } } }while (sfd < 0 ); while (1 ) { while (!loop) pause(); loop = 0 ; while ((len = read(sfd,buf,BUFSIZE)) < 0 ) { if (errno == EINTR) continue ; perror("read()" ); exit (1 ); } if (len == 0 ) break ; pos = 0 ; while (len > 0 ) { ret = write(dfd,buf+pos,len); if (ret < 0 ) { if (errno == EINTR) continue ; perror("write()" ); exit (1 ); } pos += ret; len -= ret; } } close(sfd); exit (0 ); }
mycat3.c–令牌桶实现 在mycat2.c
中可以理解为,程序有权限在1s中传输10个字节的数据,但是若在这1s内没有数据来(读取的文件中没有数据),程序不能进行传输数据,只能等待数据的到来。这样的效率很底下,换种方式,若在这1s中没有数据到来,程序也不进行干等,而是将1s传输10字节数据的权限进行积攒,等到有大量数据到来的时候,传输积攒的权限数量x10字节的内容。
需要注意一点:积攒权限的数量,存在上限,积攒的权限的数量不能无限大
如下,在将漏桶改为令牌桶的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 #include <stdio.h> #include <stdlib.h> #include <fcntl.h> #include <unistd.h> #include <errno.h> #include <signal.h> #define CPS 10 #define BUFSIZE CPS #define BURST 100 static volatile sig_atomic_t token = 0 ;static void alarm_handler (int s) { alarm(1 ); token++; if (token > BURST) token = BURST; } int main (int argc,char *argv[]) { int sfd,dfd = 1 ; char buf[BUFSIZE]; int len = 0 ; int ret = 0 ; int pos = 0 ; if (argc < 2 ) { fprintf (stderr ,"Usage...\n" ); exit (1 ); } signal(SIGALRM,alarm_handler); alarm(1 ); sfd = open(argv[1 ],O_RDONLY); do { if (sfd<0 ) { if (errno != EINTR) { perror("open()" ); exit (1 ); } } }while (sfd < 0 ); while (1 ) { while (token <= 0 ) pause(); token--; while ((len = read(sfd,buf,BUFSIZE)) < 0 ) { if (errno == EINTR) continue ; perror("read()" ); exit (1 ); } if (len == 0 ) break ; pos = 0 ; while (len > 0 ) { ret = write(dfd,buf+pos,len); if (ret < 0 ) { if (errno == EINTR) continue ; perror("write()" ); exit (1 ); } pos += ret; len -= ret; } } close(sfd); exit (0 ); }
解析 :在mycat3.c
中的令牌桶重点,即read函数调用处,若此次读取文件中没有数据,那么由于read函数是一个阻塞的系统调用,进程就会被阻塞在这个地方,但是由于之前设定了定时alarm(1),经过1s,后会有信号打断,系统调用函数read(),并且执行alarm_handler
函数,对令牌数量进行加一操作,但是程序在read
函数那个while循环中,判断发现此次打断read函数调用为EINTR
假错,则会重新调用read
函数,若此时读取的文件中还是没有数据,则会继续阻塞在此处,如此往复进行令牌数量的积攒,直到读取的文件中存在内容才会打破阻塞,若此时存在大量的数据,程序可能会在1s内消耗多个令牌权限,若在1s内消耗的令牌数量为3,则是一次性处理了10字节,连续处理了3次
(3) 令牌桶的实现 令牌桶(Token Bucket)是一种流量整形(Traffic Shaping)和流量限制(Rate Limiting)的机制,广泛用于网络带宽管理、服务器请求处理等场景。该算法通过控制数据传输的速率和突发性来确保网络服务的质量和公平性。令牌桶算法的核心思想是使用一个虚拟的“桶”,桶中存放着“令牌”(token),数据包的传输需要消耗令牌
令牌桶算法提供了一种有效的方式来控制和管理数据流,以确保网络资源的合理分配和系统服务的高可用性
令牌桶算法由一个存放令牌(tokens)的桶和一个固定的填充速率组成。数据包发送之前必须先从桶中取得足够的令牌,每个数据包可能需要一个或多个令牌。
桶的大小 :决定了在任何给定时间点,桶中可以累积的最大令牌数。这个参数可以影响突发(burst)流量的大小
令牌填充速率 :决定了令牌进入桶中的速率,通常以令牌/秒计量。这个速率限制了长期的平均发送速率
工作原理 :
令牌的生成 :令牌以固定的速率被添加到桶中。桶有一个容量上限,超过这个上限的令牌将会被丢弃
数据包的发送 :每个要发送的数据包必须先获取一定数量的令牌才能被发送。如果桶中有足够的令牌,数据包就可以立即发送并且相应数量的令牌从桶中移除;如果桶中令牌不足,数据包需要等待直到有足够的令牌
突发流量的控制 :由于桶可以存储令牌,因此在令牌积累时允许一定程度的突发传输。这是通过在一段时间不使用令牌,然后使用积累的令牌来一次性发送多个数据包来实现的
工作流程 :
桶以一定的固定速率(r)获得令牌。
桶中最多可以存储固定数量(b)的令牌,以允许一定程度的突发传输。
当一个数据包到达时,如果桶中有足够的令牌,则从桶中移除相应数量的令牌,并允许数据包发送。
如果桶中的令牌不足,则根据具体实现,数据包可能被丢弃或排队等待。
应用场景 ;
网络带宽管理 :通过限制网络流量的速率,确保网络资源被公平使用,防止某一应用或用户占用过多带宽。
服务器请求处理 :在服务器或API服务中,限制请求的速率来保护后端服务不被过载,提高系统的稳定性和可用性。
多媒体传输 :在视频流和音频流传输中控制数据的发送速率,以适应不同网络条件和缓冲需求
优点 :
灵活性 :令牌桶算法允许一定程度的突发流量,比较灵活地应对突增的数据传输需求。
平滑网络流量 :通过控制数据发送的平均速率,帮助平滑网络流量波动,减少拥塞。
适应性 :能够适应不同的网络环境和流量模式,通过调整令牌生成速率和桶的大小来满足不同的需求。
实例程序alarm实现 通过令牌桶,实现流量控制,main函数中是实现了cat指令的功能,但是使用令牌桶实现流控的能力
mytbf.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 #ifndef TOKENBUCKET_MYTBF_H #define TOKENBUCKET_MYTBF_H #define MYTBF_MAX 1024 typedef void mytbf_t ;mytbf_t *mytbf_init (int cps,int burst) ;int mytbf_fetchtoken (mytbf_t *,int ) ;int mytbf_returntoken (mytbf_t *,int ) ;int mytbf_destroy (mytbf_t *) ;#endif
mytbf.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <errno.h> #include <signal.h> #include "mytbf.h" typedef void (*sighandler_t ) (int ) ;static __sighandler_t alarm_handler_save;static struct mytbf_st * job [MYTBF_MAX ];static int inited = 0 ;struct mytbf_st { int cps; int burst; int token; int pos; }; static void alarm_handler (int s) { alarm(1 ); for (int i = 0 ; i < MYTBF_MAX; i++) { if (job[i] != NULL ) { job[i]->token += job[i]->cps; if (job[i]->token > job[i]->burst) job[i]->token = job[i]->burst; } } } static void module_unload (void ) { signal(SIGALRM, alarm_handler_save); alarm(0 ); for (int i = 0 ; i < MYTBF_MAX; i++) free (job[i]); } static void module_load (void ) { alarm_handler_save = signal(SIGALRM, alarm_handler); alarm(1 ); atexit(module_unload); } static int min (int a, int b) { if (a < b) return a; return b; } static int get_free_pos (void ) { for (int i = 0 ; i < MYTBF_MAX; i++) { if (job[i] == NULL ) return i; } return -1 ; } mytbf_t *mytbf_init (int cps,int burst) { struct mytbf_st *me ; if (!inited) { module_load(); inited = 1 ; } int pos = get_free_pos(); if (pos < 0 ) return NULL ; me = malloc (sizeof (*me)); if (me == NULL ) return NULL ; me->token = 0 ; me->cps = cps; me->burst = burst; me->pos = pos; job[pos] = me; return me; } int mytbf_fetchtoken (mytbf_t *ptr,int size) { struct mytbf_st *me = ptr; int n; if (size < 0 ) return -EINVAL; while (me->token <= 0 ) pause(); n = min(me->token,size); me->token -= n; return n; } int mytbf_returntoken (mytbf_t *ptr,int size) { struct mytbf_st *me = ptr; int n; if (size < 0 ) return -EINVAL; me->token += size; if (me->token > me->burst) me->token = me->burst; return size; } int mytbf_destroy (mytbf_t *ptr) { struct mytbf_st *me = ptr; job[me->pos] = NULL ; free (ptr); }
main.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <errno.h> #include "mytbf.h" #define CPS 10 #define BUFSIZE 1024 #define BURST 100 int main (int argc,char *argv[]) { int sfd,dfd = 1 ; int len = 0 ; int ret = 0 ; int pos = 0 ; int size = 0 ; char buf[BUFSIZE]; mytbf_t *tbf; if (argc < 2 ) { fprintf (stderr ,"Usage:%s <src_file>\n" ,argv[0 ]); exit (1 ); } tbf = mytbf_init(CPS,BURST); if (tbf == NULL ) { fprintf (stderr , "mytbf_init() failed!\n" ); exit (1 ); } do { sfd = open(argv[1 ], O_RDONLY); if (sfd < 0 ) { if (errno != EINTR) { perror("open()" ); exit (1 ); } } }while (sfd < 0 ); while (1 ) { size = mytbf_fetchtoken(tbf, BUFSIZE); if (size < 0 ) { fprintf (stderr , "mytbf_fetchtoken():%s\n" , strerror(-size)); exit (1 ); } while ((len = read(sfd,buf,size)) < 0 ) { if (errno == EINTR) continue ; perror("read()" ); exit (1 ); } if (len == 0 ) break ; if ((size - len) > 0 ) mytbf_returntoken(tbf,size-len); pos = 0 ; while (len > 0 ) { ret = write(dfd, buf + pos, len); if (ret < 0 ) { if (errno == EINTR) continue ; perror("write()" ); exit (1 ); } pos += ret; len -= ret; } } close(sfd); mytbf_destroy(tbf); exit (0 ); }
CMakeLists.txt
1 2 3 4 5 6 cmake_minimum_required(VERSION 3.19 ) project(TokenBucket C) set (CMAKE_C_STANDARD 99 )add_executable(TokenBucket main.c mytbf.c)
实例程序setitimer实现 mytbf.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 #ifndef _MYTBF_H_ #define _MYTBF_H_ #define MYTBF_MAX 1024 typedef void mytbf_t ;mytbf_t *mytbf_init (int cps, int burst) ;int mytbf_fetchtoken (mytbf_t *, int ) ;int mytbf_returntoken (mytbf_t *, int ) ;int mytbf_destroy (mytbf_t *) ;#endif
mytbf.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <errno.h> #include <signal.h> #include <sys/time.h> #include "mytbf.h" static struct mytbf_st * job [MYTBF_MAX ];static int inited = 0 ;static struct sigaction alarm_sa_save ;struct mytbf_st { int cps; int burst; int token; int pos; }; static void alarm_action (int s, siginfo_t *infop, void *unused) { if (infop->si_code != SI_KERNEL) return ; for (int i = 0 ; i < MYTBF_MAX; i++) { if (job[i] != NULL ) { job[i]->token += job[i]->cps; if (job[i]->token > job[i]->burst) job[i]->token = job[i]->burst; } } } static void module_unload (void ) { struct itimerval itv ; sigaction(SIGALRM, &alarm_sa_save, NULL ); itv.it_interval.tv_sec = 0 ; itv.it_interval.tv_usec = 0 ; itv.it_value.tv_sec = 0 ; itv.it_value.tv_usec = 0 ; setitimer(ITIMER_REAL, &itv, NULL ); for (int i = 0 ; i < MYTBF_MAX; i++) free (job[i]); } static void module_load (void ) { struct sigaction sa ; struct itimerval itv ; sa.sa_sigaction = alarm_action; sigemptyset(&sa.sa_mask); sa.sa_flags = SA_SIGINFO; sigaction(SIGALRM, &sa, &alarm_sa_save); itv.it_interval.tv_sec = 1 ; itv.it_interval.tv_usec = 0 ; itv.it_value.tv_sec = 1 ; itv.it_value.tv_usec = 0 ; setitimer(ITIMER_REAL, &itv, NULL ); atexit(module_unload); } static int min (int a, int b) { if (a < b) return a; return b; } static int get_free_pos (void ) { for (int i = 0 ; i < MYTBF_MAX; i++) { if (job[i] == NULL ) return i; } return -1 ; } mytbf_t *mytbf_init (int cps, int burst) { struct mytbf_st *me ; if (!inited) { module_load(); inited = 1 ; } int pos = get_free_pos(); if (pos < 0 ) return NULL ; me = malloc (sizeof (*me)); if (me == NULL ) return NULL ; me->token = 0 ; me->cps = cps; me->burst = burst; me->pos = pos; job[pos] = me; return me; } int mytbf_fetchtoken (mytbf_t *ptr, int size) { struct mytbf_st *me = ptr; int n; if (size <= 0 ) return -EINVAL; while (me->token <= 0 ) pause(); n = min(me->token, size); me->token -= n; return n; } int mytbf_returntoken (mytbf_t *ptr, int size) { struct mytbf_st *me = ptr; if (size <= 0 ) return -EINVAL; me->token += size; if (me->token > me->burst) me->token = me->burst; return size; } int mytbf_destroy (mytbf_t *ptr) { struct mytbf_st *me = ptr; job[me->pos] = NULL ; free (ptr); }
main.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <errno.h> #include "mytbf.h" #define CPS 10 #define BUFSIZE 1024 #define BURST 100 int main (int argc, char **argv) { int sfd, dfd = 1 ; int len = 0 ; int ret = 0 ; int pos = 0 ; int size = 0 ; char buf[BUFSIZE]; mytbf_t *tbf; if (argc < 2 ) { fprintf (stderr , "Usage:%s <src_file>\n" ,argv[0 ]); exit (1 ); } tbf = mytbf_init(CPS, BURST); if (tbf == NULL ) { fprintf (stderr , "mytbf_init() failed!\n" ); exit (1 ); } do { sfd = open(argv[1 ], O_RDONLY); if (sfd < 0 ) { if (errno != EINTR) { perror("open()" ); exit (1 ); } } }while (sfd < 0 ); while (1 ) { size = mytbf_fetchtoken(tbf, BUFSIZE); if (size < 0 ) { fprintf (stderr , "mytbf_fetchtoken():%s\n" , strerror(-size)); exit (1 ); } while ((len = read(sfd, buf, size)) < 0 ) { if (errno == EINTR) continue ; perror("read()" ); break ; } if (len == 0 ) break ; if (size - len > 0 ) mytbf_returntoken(tbf, size - len); pos = 0 ; while (len > 0 ) { ret = write(dfd, buf + pos, len); if (ret < 0 ) { if (errno == EINTR) continue ; perror("write()" ); exit (1 ); } pos += ret; len -= ret; } } close(sfd); mytbf_destroy(tbf); exit (0 ); }
8.setitimer定时函数 setitimer
是用于设置间隔计时器的函数。与之相关的是 getitimer
,用于获取当前设置的计时器的值
该函数时间精度很高,而且误差不会累积 ,因此setitimer
比alarm
函数更好
函数原型
1 2 3 #include <sys/time.h> int setitimer (int which, const struct itimerval *new_value, struct itimerval *old_value) ;
参数:
which
: 指定要设置的计时器的类型,通常可以是:
ITIMER_REAL
: 实时计时器,当计时器超时时,将发送 SIGALRM
信号。
ITIMER_VIRTUAL
: 虚拟计时器,仅在进程执行时减少。
ITIMER_PROF
: 类似于 ITIMER_VIRTUAL
,但还包括了当系统执行 behalf 时的时间。
new_value
: 指定新计时器的间隔和值。
old_value
: 如果不为NULL
,那么在调用之前计时器的当前值将存储在此处
itimerval
结构体的定义大致如下:
1 2 3 4 struct itimerval { struct timeval it_interval ; struct timeval it_value ; };
当it_value
递减为0
时,发信号。并且it_interval
会原子化赋值给it_value
,重新开始计时
其中,timeval
结构如下:
1 2 3 4 struct timeval { long tv_sec; long tv_usec; };
返回值
使用 setitimer
时,还需要处理可能由计时器超时产生的信号,如 SIGALRM
注意事项 :
定时器类型限制
每种类型的定时器(ITIMER_REAL
、ITIMER_VIRTUAL
、ITIMER_PROF
)在任何时刻都只能有一个活动实例。因此,对于每种类型的定时器,后续的 setitimer
调用会重置该类型的定时器设置
如果你调用 setitimer
设置了一个 ITIMER_REAL
定时器,然后再次调用 setitimer
设置同类型的定时器,第二个调用会覆盖第一个调用的设置
实例 :
将mycat2.c
漏桶实现的alarm函数替换为setitimer函数,alarm
只能设置一个一次性的定时器,即一旦定时器到期,就必须重新设置。而setitimer
允许设置周期性的定时器。通过设定 it_interval
结构的值,setitimer
可以在定时器到期后自动重置,无需手动再次调用。这对于需要定期执行任务的应用程序非常方便,如定期检查、更新状态或执行维护任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 #include <stdio.h> #include <stdlib.h> #include <fcntl.h> #include <unistd.h> #include <errno.h> #include <signal.h> #include <sys/time.h> #define CPS 10 #define BUFSIZE CPS static volatile int loop = 0 ; static void alarm_handler (int s) { loop = 1 ; } int main (int argc,char *argv[]) { int sfd,dfd = 1 ; char buf[BUFSIZE]; int len = 0 ; int ret = 0 ; int pos = 0 ; if (argc < 2 ) { fprintf (stderr ,"Usage...\n" ); exit (1 ); } signal(SIGALRM,alarm_handler); struct itimerval itv ; itv.it_interval.tv_sec = 1 ; itv.it_interval.tv_usec = 0 ; itv.it_value.tv_sec = 1 ; itv.it_value.tv_usec = 0 ; if (setitimer(ITIMER_REAL,&itv,NULL ) < 0 ) { perror("setitimer()" ); exit (1 ); } sfd = open(argv[1 ],O_RDONLY); do { if (sfd<0 ) { if (errno != EINTR) { perror("open()" ); exit (1 ); } } }while (sfd < 0 ); while (1 ) { while (!loop) pause(); loop = 0 ; while ((len = read(sfd,buf,BUFSIZE)) < 0 ) { if (errno == EINTR) continue ; perror("read()" ); exit (1 ); } if (len == 0 ) break ; pos = 0 ; while (len > 0 ) { ret = write(dfd,buf+pos,len); if (ret < 0 ) { if (errno == EINTR) continue ; perror("write()" ); exit (1 ); } pos += ret; len -= ret; } } close(sfd); exit (0 ); }
9.多任务计时器anytime实现 使用单一计时器,构造一组函数,实现任意数量的计时器
使用alarm函数,进行定时,会存在一个问题,如果多次调用alarm函数,在别的alarm调用还没有信号到来的期间,继续调用alarm函数,那么之前的定时会进行重置,以最后这次调用的定时间为准,如下,三次调用在第一次调用,10s计时还没有结束,就调用alarm(5),那么定时重置,这时5s定时开始,10s失效,而在5s计时还没有结束,就调用alarm(2),那么定时重置,这时2s定时开始,5s失效
1 2 3 alarm(10 ); alarm(5 ); alarm(2 );
这一节的任务 是:通过alarm
函数或者setitimer
函数,进行如下伪码计时,那么程序,在程序开始之后的2s时,发送信号,进程执行函数f2;在程序开始之后的5s时,发送信号,进程执行函数f1;最后,在程序开始之后的7s时,发送信号,进程执行函数f1;
1 2 3 定时器1 : 5 ,f1,aaa 定时器2 : 2 ,f2,bbb 定时器3 : 7 ,f1,ccc
思路 :构建如下三个任务结构体,结构体字段有sec
表示任务计时,func
表示任务处理函数,arg
表示任务参数,执行过程如下图:
Anytimer1.c Anytimer1.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 #ifndef ANYTIMER_ANYTIMER_H #define ANYTIMER_ANYTIMER_H #define JOB_MAX 1024 typedef void at_jobfunc_t (void *) ;int at_addjob (int sec, at_jobfunc_t *jobp, void *arg) ;int at_canceljob (int id) ;int at_wait (int id) ;int at_addjob_repeat (int sec, at_jobfunc_t *jobp, void *arg) ;#endif
Anytimer.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 #include <stdlib.h> #include <stdio.h> #include <string.h> #include <sys/types.h> #include <unistd.h> #include <signal.h> #include <sys/time.h> #include <errno.h> #include <wait.h> #include "anytimer.h" enum { STATE_RUNNING = 1 , STATE_CANCELED, STATE_OVER }; struct at_job_st { int job_state; int sec; int time_remain; int repeat; at_jobfunc_t *jobp; void *arg; }; static struct at_job_st *job [JOB_MAX ];static int inited = 0 ;static struct sigaction alrm_sa_save ; static void alrm_action (int s,siginfo_t *infop,void *unused) { if (infop->si_code != SI_KERNEL) return ; for (int i=0 ;i<JOB_MAX;i++) { if (job[i]!=NULL && job[i]->job_state == STATE_RUNNING) { job[i]->time_remain--; if (job[i]->time_remain == 0 ) { job[i]->jobp(job[i]->arg); if (job[i]->repeat == 1 ) job[i]->time_remain = job[i]->sec; else job[i]->job_state = STATE_OVER; } } } } static int get_free_pos (void ) { for (int i = 0 ;i<JOB_MAX;i++) { if (job[i] ==NULL ) return i; } return -1 ; } static void module_unload (void ) { struct itimerval itv ; itv.it_interval.tv_sec = 0 ; itv.it_interval.tv_usec = 0 ; itv.it_value.tv_sec = 0 ; itv.it_value.tv_usec = 0 ; if (setitimer(ITIMER_REAL,&itv,NULL ) < 0 ) { perror("setitimer()" ); exit (1 ); } if (sigaction(SIGALRM,&alrm_sa_save,NULL ) < 0 ) { perror("sigaction()" ); exit (1 ); } } static void module_load (void ) { struct sigaction sa ; struct itimerval itv ; sa.sa_sigaction = alrm_action; sigemptyset(&sa.sa_mask); sa.sa_flags = SA_SIGINFO; if (sigaction(SIGALRM,&sa,&alrm_sa_save) < 0 ) { perror("aigaction()" ); exit (1 ); } itv.it_interval.tv_sec = 1 ; itv.it_interval.tv_usec = 0 ; itv.it_value.tv_sec = 1 ; itv.it_value.tv_usec = 0 ; if (setitimer(ITIMER_REAL,&itv,NULL ) < 0 ) { perror("setitimer()" ); exit (1 ); } atexit(module_unload); } int at_addjob (int sec, at_jobfunc_t *jobp, void *arg) { int pos; struct at_job_st *me ; if (sec < 0 ) return -EINVAL; if (!inited) { module_load(); inited = 1 ; } pos = get_free_pos(); if (pos < 0 ) { return -ENOSPC; } me = malloc (sizeof (*me)); if (me == NULL ) return -ENOMEM; me->job_state = STATE_RUNNING; me->sec = sec; me->time_remain = me->sec; me->jobp = jobp; me->arg = arg; me->repeat = 0 ; job[pos] = me; return pos; } int at_canceljob (int id) { if (id < 0 || id >= JOB_MAX || job[id] == NULL ) return -EINVAL; if (job[id]->job_state == STATE_CANCELED) return -ECANCELED; if (job[id]->job_state == STATE_OVER) return -EBUSY; job[id]->job_state = STATE_CANCELED; return 0 ; } int at_addjob_repeat (int sec, at_jobfunc_t *jobp, void *arg) { int pos; struct at_job_st *me ; if (sec < 0 ) return -EINVAL; if (!inited) { module_load(); inited = 1 ; } pos = get_free_pos(); if (pos < 0 ) return -ENOSPC; me = malloc (sizeof (*me)); if (me == NULL ) return -ENOMEM; me->job_state = STATE_RUNNING; me->sec = sec; me->time_remain = me->sec; me->jobp = jobp; me->arg = arg; me->repeat = 1 ; job[pos] = me; return pos; } int at_waitjob (int id) { if (id < 0 || id >= JOB_MAX || job[id] == NULL ) return -EINVAL; if (job[id]->repeat == 1 ) return -EBUSY; while (job[id]->job_state == STATE_RUNNING) pause(); if (job[id]->job_state == STATE_CANCELED || job[id]->job_state == STATE_OVER) { free (job[id]); job[id] = NULL ; } return 0 ; }
main.c
在jobalarm
函数中,添加了三个定时任务,当运行job1,job2,job3
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 #include <stdio.h> #include <stdlib.h> #include <string.h> #include <wait.h> #include <sys/types.h> #include <unistd.h> #include "anytimer.h" static void f1 (void *s) { printf ("f1():%s\n" , (char *)s); fflush(NULL ); } static void f2 (void *s) { printf ("f2():%s\n" , (char *)s); fflush(NULL ); } static void repeat (void ) { int job1; puts ("Begin!" ); job1 = at_addjob_repeat(2 , f1, "aaa" ); if (job1 < 0 ) { fprintf (stderr , "at_addjob():%s\n" , strerror(-job1)); exit (1 ); } while (1 ) { write(1 , "." , 1 ); sleep(1 ); } } static void jobalarm (void ) { int job1, job2, job3; puts ("Begin!" ); job1 = at_addjob(5 , f1, "aaa" ); if (job1 < 0 ) { fprintf (stderr , "at_addjob():%s\n" , strerror(-job1)); exit (1 ); } job2 = at_addjob(2 , f2, "bbb" ); if (job2 < 0 ) { fprintf (stderr , "at_addjob():%s\n" , strerror(-job2)); exit (1 ); } job3 = at_addjob(7 , f1, "ccc" ); if (job3 < 0 ) { fprintf (stderr , "at_addjob():%s\n" , strerror(-job3)); exit (1 ); } while (1 ) { write(1 , "." , 1 ); sleep(1 ); } } int main (void ) { jobalarm(); return 0 ; }
CMakeLists.txt
1 2 3 4 5 6 cmake_minimum_required(VERSION 3.19 ) project(AnyTimer C) set (CMAKE_C_STANDARD 99 )add_executable(AnyTimer main.c anytimer.h anytimer.c)
编译运行
Anytimer2.c 简版 anytimer.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 #ifndef ANYTIMER2_ANYTIMER_H #define ANYTIMER2_ANYTIMER_H #define BUFSIZE 1024 typedef void anytm_func (void *) ;int anytimer_alarm (int ,anytm_func *,const void *) ;void anytimer_close (int n) ;void anytimer_destroy (void ) ;#endif
anytimer.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 #include <stdlib.h> #include <stdio.h> #include <string.h> #include <sys/types.h> #include <unistd.h> #include <signal.h> #include <sys/time.h> #include <errno.h> #include <wait.h> #include "anytimer.h" static int init_mod;struct sigaction oldact ; struct alarm_st { int token; anytm_func *anyfunc; void *str; }; static struct alarm_st *alarm_arr [BUFSIZE ];static void __arr_destroy(void ){ sigaction(SIGALRM,&oldact,NULL ); struct itimerval itv ; itv.it_interval.tv_sec = 0 ; itv.it_interval.tv_usec = 0 ; itv.it_value.tv_sec = 0 ; itv.it_value.tv_usec = 0 ; setitimer(ITIMER_REAL,&itv,NULL ); } static void __ifarr_NULL(void ){ int i; for (i = 0 ;i<BUFSIZE && alarm_arr[i] == NULL ;i++); if (i == BUFSIZE) anytimer_destroy(); } static void alarm_handler (int s) { int i; for (i = 0 ;i<BUFSIZE;i++) { if (alarm_arr[i] != NULL ) { alarm_arr[i]->token--; if (alarm_arr[i]->token <= 0 ) { alarm_arr[i]->anyfunc(alarm_arr[i]->str); free (alarm_arr[i]); alarm_arr[i] = NULL ; } } } __ifarr_NULL(); } static void __alarm_init(void ){ struct itimerval newtime ; newtime.it_interval.tv_sec = 1 ; newtime.it_interval.tv_usec = 0 ; newtime.it_value.tv_sec = 1 ; newtime.it_value.tv_usec = 0 ; struct sigaction act ; act.sa_handler = alarm_handler; sigemptyset(&act.sa_mask); act.sa_flags = 0 ; setitimer(ITIMER_REAL,&newtime,NULL ); sigaction(SIGALRM,&act,&oldact); init_mod = 1 ; } int anytimer_alarm (int sec,anytm_func *anytm,const void *p) { if (0 == init_mod) __alarm_init(); int i; for (i=0 ;i<BUFSIZE;i++) if (alarm_arr[i] == NULL ) break ; if (i>=BUFSIZE) return -1 ; struct alarm_st *me = NULL ; me = malloc (sizeof (*me)); if (me == NULL ) { perror("malloc()" ); return -1 ; } me->token = sec; me->anyfunc = anytm; me->str = (void *)p; alarm_arr[i] = me; return i; } void anytimer_close (int n) { if (alarm_arr[n] != NULL ) { free (alarm_arr[n]); alarm_arr[n] = NULL ; } __ifarr_NULL(); } void anytimer_destroy (void ) { __arr_destroy(); for (int i = 0 ; i < 1024 ; i++) { if (alarm_arr[i] != NULL ) { free (alarm_arr[i]); alarm_arr[i] = NULL ; } } }
main.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 #include <stdio.h> #include <stdlib.h> #include <wait.h> #include <sys/types.h> #include <unistd.h> #include "anytimer.h" static void any1 (void *s) { printf ("%s" , (char *)s); fflush(NULL ); } int main () { int i = 0 ; int alarm_arr_id; anytimer_alarm(2 , any1, "hello2" ); anytimer_alarm(4 , any1, "hello4" ); alarm_arr_id = anytimer_alarm(6 , any1, "hello6" ); anytimer_alarm(8 , any1, "hello8" ); anytimer_alarm(10 , any1, "hello10" ); anytimer_alarm(12 , any1, "hello12" ); while (1 ) { write(1 , "." , 1 ); sleep(1 ); i++; if (i == 5 ) anytimer_close(alarm_arr_id); if (i == 9 ) anytimer_destroy(); } return 0 ; }
CMakeLists.txt
1 2 3 4 5 6 cmake_minimum_required(VERSION 3.19 ) project(Anytimer2 C) set (CMAKE_C_STANDARD 99 )add_executable(Anytimer2 main.c anytimer.h anytimer.c)
编译运行如下:
10.信号集 信号是一种中断机制,用于通知进程某些特定事件的发生。信号集是一组信号的集合,用于表示多个信号
在C语言中,您可以使用以下库函数和数据结构来处理信号集:
sigset_t
:这是一个数据结构,用于表示信号集。
sigemptyset(sigset_t *set)
:初始化信号集为空集。
sigfillset(sigset_t *set)
:将所有信号添加到信号集。
sigaddset(sigset_t *set, int signum)
:将指定信号添加到信号集。
sigdelset(sigset_t *set, int signum)
:从信号集中删除指定信号。
sigismember(const sigset_t *set, int signum)
:检查指定信号是否在信号集中
(1)sigprocmask函数 该函数无法决定信号什么时候来,但是可以决定信号何时被响应
sigprocmask
函数用于检查或更改进程的信号屏蔽字。这可以用于阻塞或解阻某些信号 ,使得在某些代码段执行时,这些信号不会被传递到进程
函数原型
1 int sigprocmask (int how, const sigset_t *set , sigset_t *oldset) ;
参数:
how
:这决定了如何更改当前的信号屏蔽字。它可以是以下之一:
SIG_BLOCK
:将set
中的信号添加到当前的信号屏蔽字中。
SIG_UNBLOCK
:从当前的信号屏蔽字中删除set
中的信号。
SIG_SETMASK
:将当前信号屏蔽字设置为 set
指定的值。
set
:是一个指向信号集的指针,该信号集指定了要进行上述操作的信号。如果set
为NULL,则how
的值不重要,信号屏蔽字不会被更改。
oldset
:oldset
参数是一个指向 sigset_t
类型的变量的指针,用于存储修改前的信号屏蔽字。如果不需要这个信息,可以将其设置为 NULL
。
返回值:
如果成功,则返回0
。
如果出错,则返回-1
,并设置errno
以指示错误。
程序实例1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 #include <stdio.h> #include <stdlib.h> #include <signal.h> #include <unistd.h> int main () { int i,j; sigset_t set ,oset,saveset; sigaddset(&set ,SIGINT); sigprocmask(SIG_UNBLOCK,&set ,&saveset); for (j = 0 ;j<1000 ;j++) { sigprocmask(SIG_BLOCK,&set ,&oset); for (i = 0 ;i<5 ;i++) { write(1 ,"*" ,1 ); sleep(1 ); } write(1 ,"\n" ,1 ); sigprocmask(SIG_SETMASK,&oset,NULL ); } sigprocmask(SIG_SETMASK,&saveset,NULL ); exit (0 ); }
该程序运行之后,在终端时入ctrl+c
不会立刻进行响应信号,在打印*
的过程中打印!
号,而是会在换行之后作出响应,因为这个时候解除了对于信号的阻塞
注意编写信号有关的程序,需要保证信号在全局的状态是不变的,即运行程序后,信号的状态不变,因此在使用函数对某信号进行阻塞时,应该先对其状态进行保存,在使用运行程序结束后,对信号的状态进行恢复
运行结果
1 2 3 4 5 (base) zxz@zxz-B660M-GAMING-X-AX-DDR4:~/Proj/CLionProj/UNIX/IO/signal$ ./7 **^C*^C** !***^C** !*****
程序实例2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 #include <stdio.h> #include <stdlib.h> #include <signal.h> #include <unistd.h> static void int_handler (int s) { write(1 ,"!" ,1 ); } int main () { int i,j; sigset_t set ,oset,saveset; signal(SIGINT,int_handler); sigaddset(&set ,SIGINT | SIGQUIT); sigprocmask(SIG_UNBLOCK,&set ,&saveset); for (j = 0 ;j<1000 ;j++) { sigprocmask(SIG_BLOCK,&set ,&oset); for (i = 0 ;i<5 ;i++) { write(1 ,"*" ,1 ); sleep(1 ); } write(1 ,"\n" ,1 ); sigprocmask(SIG_SETMASK,&oset,NULL ); } sigprocmask(SIG_SETMASK,&saveset,NULL ); exit (0 ); }
11.信号屏蔽字/Pending集的处理 视频教程
12.扩展 (1)sigsuspend() sigsuspend
是UNIX
和类UNIX
系统中C
语言的一个系统调用函数,用于暂时替换进程的信号屏蔽字并暂停进程执行(sigsuspend()
会使进程进入阻塞等待状态,直到接收到一个未被屏蔽的信号),直到接收到一个信号。一旦进程接收到一个信号,sigsuspend
将返回,并恢复进程的原始信号屏蔽字
这个函数在某些场景中非常有用,特别是当进程希望等待一个信号但不希望被其他信号中断时
函数原型
1 int sigsuspend (const sigset_t *mask) ;
参数:
返回值:
sigsuspend
总是返回-1
,并设置errno
为EINTR
,表示由信号中断
该函数的工作流程如下:
替换信号屏蔽集 :sigsuspend()
用 mask
替换进程的当前信号屏蔽集。信号屏蔽集是一组信号,它们在进程运行期间被屏蔽,不会立即传递给进程
等待信号 :函数使进程进入等待状态,直到接收到一个未屏蔽的信号。该信号将被传递给进程,并触发相应的信号处理程序或默认行为
恢复信号屏蔽集 :在信号处理程序返回或默认行为执行完毕后,sigsuspend()
恢复进程的原始信号屏蔽集
返回 :sigsuspend()
函数返回 -1,并设置 errno
表示错误类型。通常,它在接收到一个未屏蔽的信号后返回
(2)sigaction() sigaction()
是一个在 Unix-like
系统中用于检查或修改信号行为的函数。它被设计用来取代早期的 signal()
函数,因为它提供了更多的功能和更明确的语义。
信号是 UNIX
系统中进程间通讯的方式之一。当进程接收到一个信号时,它可以选择忽略该信号、捕获该信号并执行相应的处理程序,或者按照默认方式处理该信号(例如,SIGTERM
信号的默认处理方式是终止进程)
函数原型
1 2 3 #include <signal.h> int sigaction (int signum, const struct sigaction *act, struct sigaction *oldact) ;
参数:
signum
: 需要检查或修改行为的信号。
act
: 如果不为 NULL
,则它指向一个定义新行为的 sigaction
结构体。
oldact
: 如果不为 NULL
,则在调用之后,这个结构体会被设置为信号的旧行为
sigaction
结构体的定义可能如下:
1 2 3 4 5 6 7 struct sigaction { void (*sa_handler)(int); void (*sa_sigaction)(int, siginfo_t *, void *); sigset_t sa_mask; int sa_flags; void (*sa_restorer)(void); };
其中:
sa_handler
: 对于信号的反应(例如:SIG_IGN
, SIG_DFL
, 或者一个函数指针)。
sa_sigaction
: 一个替代 sa_handler
的处理函数,但它提供更多信息。
sa_mask
: 在处理该信号时,额外需要被阻塞的信号集。
sa_flags
: 修改行为的标志。
sa_restorer
: 以前用于内部恢复操作,现在已经不再使用
一些常见的 sa_flags
值:
SA_SIGINFO
: 使用 sa_sigaction
而不是 sa_handler
。
SA_RESTART
: 使某些被信号中断的系统调用可重启。
SA_NOCLDSTOP
: 如果信号是 SIGCHLD
,则只有当子进程退出时才会收到,而不是当它停止时
程序实例:使用 sigaction
来设置一个处理 SIGINT
信号(通常是 Ctrl+C
产生的)的处理函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 #include <stdio.h> #include <signal.h> #include <unistd.h> void handle_sigint (int sig) { printf ("\nCaught signal %d\n" , sig); } int main () { struct sigaction sa ; sa.sa_handler = handle_sigint; sa.sa_flags = SA_RESTART; sigaction(SIGINT, &sa, NULL ); while (1 ) { printf ("Running...\n" ); sleep(1 ); } return 0 ; }
这段代码定义了一个处理函数 handle_sigint
来捕获 SIGINT
信号,并使用 sigaction
函数将其设为 SIGINT
信号的处理程序
二、线程 1.线程的概念 线程通俗理解为一个正在运行的函数
在之前的进程就是容器用于承载多个线程
线程,在计算机科学中通常指的是线程的执行单元。在操作系统中,一个程序可以有多个线程,每个线程可以并行(或伪并行在单核CPU
上)执行。线程比进程更轻量级,多个线程共享同一个地址空间和资源(线程通信相对于进程简单),但每个线程有其自己的指令指针、堆栈 等
一个进程中至少有一个线程
线程的优点 :
效率 :线程的创建和销毁比进程要快得多,切换线程的成本也比切换进程要小。
资源共享 :因为线程之间共享相同的地址空间,所以一个线程可以访问另一个线程的数据。这使得数据交换变得容易,但也意味着需要同步机制来避免资源竞争和数据不一致。
响应速度 :在多线程程序中,即使一个线程被阻塞(例如等待I/O
操作),其他线程仍然可以继续执行。
利用多核 :现代的多核处理器可以同时执行多个线程,这使得多线程程序可以真正地并行执行,从而提高了性能
线程缺点 :
同步和死锁 :当多个线程试图访问共享资源时,可能会发生资源竞争,这需要使用同步机制(如互斥锁、信号量等)来解决。但不恰当的使用同步机制可能导致死锁。
数据不一致 :多个线程并发修改共享数据可能导致数据不一致。
调试困难 :多线程程序的行为可能是不确定的,因此调试可能比单线程程序更加困难。
设计复杂性 :编写和维护多线程代码需要更高的专业知识和经验
在ubuntu可以使用命令:ps axm
查看线程
(1)线程标识符
(2)pthread_equal() pthread_equal
是一个用于比较两个线程ID
是否相同的函数
函数原型
1 int pthread_equal (pthread_t t1, pthread_t t2) ;
参数:
返回值:
如果两个线程ID
相同,则返回非零值。
如果两个线程ID
不同,则返回0
(3)pthread_self() pthread_self
是 POSIX
线程库中的一个函数,用于获取调用线程的线程ID
。这个函数在你需要在某个线程内部知道它自己的线程ID时特别有用
函数原型
1 pthread_t pthread_self (void ) ;
返回值:
(4)sched_yield() sched_yield()
函数是一个用于控制线程调度的重要函数。该函数的作用是让出调用线程的当前CPU时间片 ,给其他线程或进程运行的机会。这可以帮助实现更加公平的线程调度,特别是在多线程环境中,当线程需要等待某些资源释放或条件满足时,主动让出CPU,从而提高整体系统的效率和响应性。
1 2 3 #include <sched.h> int sched_yield (void ) ;
返回值:
行为用途:
当线程调用 sched_yield()
后,它会告诉操作系统的调度器,当前线程愿意放弃其剩余的时间片,使调度器可以选择另一个线程或进程来执行。这通常在以下情况下使用:
改善响应性 :在计算密集型任务中,线程可以定期调用 sched_yield()
,以避免独占CPU资源过长时间,从而允许其他同等优先级的线程有机会运行,提高系统的响应性
等待资源 :当线程需要等待一个条件变量、锁或其他资源时,而这些资源当前不可用,线程可以调用 sched_yield()
来减少忙等 (busy-waiting)带来的CPU资源浪费
协同运行 :在某些协作型多线程应用程序中,线程间需要频繁交换数据或状态信息。在这些情况下,通过 sched_yield()
可以帮助实现更平滑的协同执行
(5)线程状态与CPU占用 在多线程编程中,线程可以处于不同的状态:
运行(Running) :线程正在 CPU 上执行。
就绪(Runnable) :线程已准备好运行,正在等待 CPU 时间。
阻塞(Blocked) :线程因等待某种资源(如 I/O 操作完成、获取锁、等待其他事件如条件变量)而暂停执行。
休眠(Sleeping) :线程主动放弃了 CPU,通常是调用了 sleep()
或类似函数。
终止(Terminated) :线程已完成执行或被终止。
当线程处于阻塞或休眠状态时,它不会占用 CPU 时间。操作系统调度器会将 CPU 分配给其他处于就绪状态的线程。这样的调度策略使得 CPU 资源可以有效地在多个线程之间共享,提高了系统的整体效率和响应性。
线程挂起的影响:
资源使用 :挂起的线程不占用 CPU 周期,但仍然会占用一定的内存资源,因为线程的上下文(如栈、寄存器状态、线程局部存储)需要保留,以便线程可以在后续被唤醒并继续执行
系统性能 :合理的线程挂起可以帮助系统更好地管理资源,避免无谓的 CPU 占用,尤其是在线程需要等待外部事件或数据时。通过让线程挂起,系统可以将有限的 CPU 资源分配给其他可能正在进行计算的线程,从而优化整体性能
2.线程创建 (1)pthread_create() pthread_create
是POSIX
线程库中的一个函数,用于创建一个新的线程。这个函数允许开发者定义一个函数,这个函数将会在新创建的线程中运行
函数原型
1 int pthread_create (pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg) ;
参数:
thread
:指向 pthread_t
类型的指针,用于存储新创建线程的线程ID。
attr
:指向 pthread_attr_t
类型的指针,用于设置线程属性(如堆栈大小)。如果传递 NULL
,则会使用默认属性(一般默认就可以解决80%的问题 )。
start_routine
:指针,指向新线程启动时需要调用的函数。这个函数的返回类型必须为 void *
,并且接受一个 void *
参数。
arg
:传递给 start_routine
的参数
返回值:
如果线程成功创建,返回0
。
如果发生错误,返回一个非0
的错误代码(error number
)
程序实例
create1.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 #include <stdio.h> #include <stdlib.h> #include <errno.h> #include <pthread.h> #include <string.h> static void *func (void *p) { puts ("Thread is working!" ); return NULL ; } int main () { int err; pthread_t tid; puts ("Begin!" ); err = pthread_create(&tid,NULL ,func,NULL ); if (err) { fprintf (stderr ,"pthread_create():%s\n" , strerror(err)); exit (1 ); } puts ("End!" ); exit (0 ); }
CMakeLists.txt
1 2 3 4 5 6 7 8 cmake_minimum_required (VERSION 3.16 )project (Proj_1) add_executable (create1 create1.c)find_package (Threads REQUIRED)target_link_libraries (create1 Threads::Threads)
运行之后
1 2 3 (base) zxz@zxz-B660M-GAMING-X-AX-DDR4:~/Proj/CLionProj/UNIX/IO/thread/create1/build$ ./create1 Begin! End!
最后运行程序之后并没有显示调用线程函数中的输出内容,(线程的调用取决于调度器策略 )这是因为线程还没有来得及调用,进程已经通过exit(0)
结束
(2)pthread_once() 该函数确保某个指定的初始化函数在一个程序中只被执行一次,即使多个线程尝试执行它。这个函数特别适用于多线程环境中的一次性初始化任务,例如初始化全局变量、设置环境或配置单例模式
1 2 3 #include <pthread.h> int pthread_once (pthread_once_t *once_control, void (*init_routine)(void )) ;
参数:
**once_control
**:指向 pthread_once_t
类型的变量的指针,该变量必须使用 PTHREAD_ONCE_INIT
初始化。这个变量跟踪初始化函数是否已被调用
**init_routine
**:指向要执行的初始化函数的指针。这个函数没有参数和返回值。pthread_once
会确保这个函数只被执行一次
返回:
实例 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 #include <pthread.h> #include <stdio.h> pthread_once_t once = PTHREAD_ONCE_INIT;void initialize () { printf ("Initialization done.\n" ); } void * thread_function (void * arg) { pthread_once(&once, initialize); printf ("Thread %ld running.\n" , (long ) arg); return NULL ; } int main () { pthread_t threads[4 ]; for (long i = 0 ; i < 4 ; i++) { pthread_create(&threads[i], NULL , thread_function, (void *)i); } for (int i = 0 ; i < 4 ; i++) { pthread_join(threads[i], NULL ); } return 0 ; }
3.线程终止 (1)终止方式
线程从启动历程返回.返回值就是线程的退出码
线程可以被统同一进程中的其他线程取消
线程调用pthread_exit()
函数
(2)pthread_exit() pthread_exit
函数用于从调用线程中退出,并提供一个退出值,该值可由另一个线程通过pthread_join
获取。当一个线程调用pthread_exit
时,线程的清理处理程序将被执行,其存储空间(例如栈)会被回收
函数原型
1 void pthread_exit (void *retval) ;
参数:
retval
: 是一个指向任意数据类型的指针,表示线程的退出值。如果你不需要返回特定的值,可以简单地传递NULL
说明:
调用pthread_exit
将不会关闭整个进程,只会关闭调用它的线程。
如果主线程调用了pthread_exit
而没有结束程序,那么其他线程仍然可以继续执行。
如果从主函数返回,其行为类似于调用exit
(整个进程都会结束,所有线程都会被关闭)。但是,如果主线程调用pthread_exit
,进程不会结束,其他线程可以继续执行
(3)pthread_join() 该函数类似于进程中的wait()
函数,用于给线程进行收尸
pthread_join
是 POSIX
线程库中的一个函数,用于阻塞调用线程,直到指定的线程终止。这个函数还允许调用线程检索目标线程的退出状态
函数原型
1 int pthread_join (pthread_t thread, void **retval) ;
参数
thread
:是要等待的线程的标识符。
retval
:是一个指向指针的指针,用于获取线程的退出值。如果不关心退出值,可以设置为 NULL
返回值
如果成功,返回 0
。
如果出错,返回一个非零错误代码。
注意事项 :
如果目标线程已经终止,但还没有被join
,那么它被称为“僵尸线程”。pthread_join
的一个主要用途是从系统中清除僵尸线程。
一个线程只能被 join
一次。一旦一个线程被成功地 join
,它的 ID
就可以被再次使用。
线程不是必须被 join
。例如,如果你不关心线程的退出状态,你可以选择不 join
它
程序实例
create1.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 #include <stdio.h> #include <stdlib.h> #include <errno.h> #include <pthread.h> #include <string.h> #include <unistd.h> static void *func (void *p) { puts ("Thread is working!" ); pthread_exit(NULL ); } int main () { int err; void *thread_exit_status; pthread_t tid; puts ("Begin!" ); err = pthread_create(&tid,NULL ,func,NULL ); if (err) { fprintf (stderr ,"pthread_create():%s\n" , strerror(err)); exit (1 ); } pthread_join(tid,thread_exit_status); printf ("Thread exit status: %ld\n" , (long )thread_exit_status); puts ("End!" ); exit (0 ); }
(4)栈的清理(线程中的钩子函数) 在多线程编程中,线程可能在执行期间的任何时间点被取消。当线程被取消时,可能会需要执行一些清理任务,如释放资源、解锁互斥锁等。POSIX
线程库提供了pthread_cleanup_push
和pthread_cleanup_pop
函数,允许你指定在线程退出或被取消时执行的清理任务
pthread_cleanup_push
函数 :
该函数将清理处理程序(一个函数)推入当前线程的清理处理程序堆栈。
1 void pthread_cleanup_push (void (*routine) (void *), void *arg) ;
routine
: 这是一个指向清理函数的指针。当线程退出或被取消时,这个函数将被调用。
arg
: 这是传递给清理函数的参数
pthread_cleanup_pop
函数 :
该函数从当前线程的清理处理程序堆栈中弹出最近推入的清理处理程序
1 void pthread_cleanup_pop (int execute) ;
execute
: 如果非零,清理函数将被执行;否则,清理函数不会被执行
程序实例 create2.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 #include <stdio.h> #include <stdlib.h> #include <errno.h> #include <pthread.h> #include <string.h> #include <unistd.h> static void cleanup_func (void *p) { puts ((char *) p); } static void *func (void *p) { puts ("Thread is working!" ); pthread_cleanup_push(cleanup_func, "cleanup:1" ); pthread_cleanup_push(cleanup_func, "cleanup:2" ); pthread_cleanup_push(cleanup_func, "cleanup:3" ); puts ("push over!" ); pthread_cleanup_pop(1 ); pthread_cleanup_pop(1 ); pthread_cleanup_pop(1 ); pthread_exit(NULL ); } int main () { int err; pthread_t tid; puts ("Begin!" ); err = pthread_create(&tid,NULL ,func,NULL ); if (err) { fprintf (stderr ,"pthread_create():%s\n" , strerror(err)); exit (1 ); } pthread_join(tid,NULL ); puts ("End!" ); exit (0 ); }
CMakeLists.txt
1 2 3 4 5 6 cmake_minimum_required(VERSION 3.16 ) project(Proj_1) # 工程名字可以与可执行程序名不一样 add_executable(create2 create2.c) find_package(Threads REQUIRED) target_link_libraries(create2 Threads::Threads)
运行结果
1 2 3 4 5 6 7 8 9 (base) zxz@zxz-B660M-GAMING-X-AX-DDR4:~/Proj/CLionProj/UNIX/IO/thread/create2/build$ ./create2 Begin! Thread is working! push over! cleanup:3 cleanup:2 cleanup:1 End!
需要注意的是,pthread_cleanup_push
和pthread_cleanup_pop
必须在同一个代码块中,并且对于每个push
都必须有一个匹配的pop
,pthread_cleanup_pop
函数即使放到static void *func(void *p)
函数中的pthread_exit(NULL)
之后也可以,但是必须要有,这两个所谓的函数实质上是宏
4.线程取消 (1)pthread_cancel() pthread_cancel
函数用于请求取消一个线程。需要注意的是,“请求”取消并不意味着线程立即终止。相反,目标线程决定在哪些点上检查取消请求,这些点称为“取消点”
函数原型
1 int pthread_cancel (pthread_t thread) ;
参数
返回值
如果成功,返回 0
。
如果出错,返回一个非零错误代码
注意事项:
取消一个线程并不立即终止它,但是会设置一个请求来取消线程。线程在取消点检查这个请求。
常见的取消点包括一些系统调用,如open()
,read()
, write()
, sleep()
和pthread_testcancel()
等。
可以使用pthread_setcancelstate()
和pthread_setcanceltype()
函数来控制线程的取消行为
取消状态
允许取消
pthread_setcancelstate()
pthread_setcanceltype()
pthread_testcancel()
(2)线程分离 在多线程编程中,线程分离(Detached threads
)是一个重要的概念。当一个线程结束运行并退出时,其退出状态需要被另一个线程收集,这通常是通过 pthread_join
函数完成的。如果一个线程没有被其他线程join
,那么它的结束状态就会被留在系统中,这种线程被称为“僵尸线程”。
为了避免这种情况,我们可以创建一个分离的线程。一个分离的线程在结束时会自动清理其资源,不需要其他线程来join
它
如何创建一个分离线程?
使用 pthread_attr_t
和 pthread_attr_init
可以在创建线程之前设置线程属性使其为分离的
1 2 3 4 5 6 7 8 9 pthread_t tid;pthread_attr_t attr;pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); pthread_create(&tid, &attr, thread_function, NULL ); pthread_attr_destroy(&attr);
如果线程已经创建了,你可以使用 pthread_detach
函数来将它设置为分离的
1 2 3 4 pthread_t tid;pthread_create(&tid, NULL , thread_function, NULL ); pthread_detach(tid);
注意
分离的线程不能被其他线程join
。
你不应该分离一个线程并且也试图join
它。这种行为是未定义的。
如果你知道一个线程不会被join
,最好将其设置为分离的,以确保资源得到适当的清理。
程序实例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 #include <stdio.h> #include <pthread.h> void * thread_function (void * arg) { printf ("Inside the detached thread\n" ); pthread_exit(NULL ); } int main () { pthread_t tid; pthread_create(&tid, NULL , thread_function, NULL ); pthread_detach(tid); printf ("Main thread continuing...\n" ); sleep(2 ); return 0 ; }
在这个例子中,创建了一个分离的线程,它会自动清理自己,主线程只是继续运行并在最后休眠了一段时间以等待分离的线程完成
5.线程竞争 线程竞争,通常称为竞争条件(Race Condition
),发生在两个或更多线程并发访问共享资源,并尝试读、写或修改这些资源,导致执行结果变得不可预测。这是因为线程的调度方式是不确定的,所以你不能预知哪个线程首先访问资源或在什么时候进行访问
竞争条件可能导致各种问题,包括:
数据不一致:共享数据可能处于不一致的状态。
程序崩溃:如果一个线程期望某些数据保持不变,但另一个线程修改了它,可能导致不可预测的行为。
性能问题:例如,如果两个线程都尝试更新相同的数据,可能需要重试,从而降低效率
解决方法
互斥量 (Mutexes):这是一种同步原语,可以确保一次只有一个线程可以访问某些资源或代码段。例如,pthread_mutex_t
是POSIX
线程库中的一个常见互斥量。
读-写锁 (Read-Write Locks):允许多个线程同时读共享资源,但一次只有一个线程可以写。
信号量 (Semaphores):是一个更通用的同步工具,它可以控制同时访问某个资源的线程数。
原子操作 (Atomic Operations):这些操作在单个操作中完成,不会被其他线程中断。这些操作通常由硬件支持,并可确保数据的完整性。
防止竞争条件的关键是识别并保护对共享资源的所有访问,并确保一次只有一个线程可以执行修改。
(1)程序实例 该程序目的,创建多个线程同时对一个文件进行读写,最后每次运行程序文件中的数字应该要比运行前大20
在程序的同级别目录下创建out
文件,终端指令echo 1 > ./out
向该文件中输入数字1
primer0.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 #include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <string.h> #define THRNUM 20 #define FNAME "../out" #define LINESIZE 1024 static void *thr_add (void *p) { FILE *fp; char linebuf[LINESIZE]; fp = fopen(FNAME,"r+" ); if (fp ==NULL ) { perror("fopen()" ); exit (1 ); } fgets(linebuf,LINESIZE,fp); fseek(fp,0 ,SEEK_SET); sleep(1 ); fprintf (fp,"%d\n" ,atoi(linebuf)+1 ); fclose(fp); pthread_exit(NULL ); } int main () { int i,err; pthread_t tid[THRNUM]; for (i = 0 ;i<THRNUM;i++) { err = pthread_create(tid+i,NULL ,thr_add,NULL ); if (err) { fprintf (stderr ,"pthread_create():%s\n" ,strerror(err)); exit (1 ); } } for (i = 0 ;i<THRNUM;i++) { pthread_join(tid[i],NULL ); } exit (0 ); }
CMakeLists.txt
1 2 3 4 5 6 cmake_minimum_required(VERSION 3.16 ) project(Proj_1) # 工程名字可以与可执行程序名不一样 add_executable(primer0 primer0.c) find_package(Threads REQUIRED) target_link_libraries(primer0 Threads::Threads)
运行结果:
最后文件中的值为2,是因为出现了线程竞争,相当于20个线程对文件中的内容取20次,都为1在对文件重复写了20次都为2
1 2 3 (base) zxz@zxz-B660M-GAMING-X-AX-DDR4:~/Proj/CLionProj/UNIX/IO/thread/create3/build$ ./primer0 (base) zxz@zxz-B660M-GAMING-X-AX-DDR4:~/Proj/CLionProj/UNIX/IO/thread/create3/build$ cat ../out 2
(2)线程同步 互斥量 互斥量(Mutex
, 是 Mutual Exclusion 的缩写)是多线程编程中的一个同步原语,它用于保护对共享资源的并发访问,从而避免竞争条件。一个互斥量在任意时刻都只能被一个线程锁定。如果其他线程试图再次锁定它,它们会被阻塞,直到第一个线程释放互斥量
在 POSIX
线程(pthreads)
库中,互斥量的类型是 pthread_mutex_t
使用互斥量基本步骤
1 pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
锁定互斥量 : 当你需要保护一段代码或资源时,可以尝试锁定互斥量。
1 pthread_mutex_lock(&mutex);
访问受保护的资源或代码
解锁互斥量 : 访问完成后,你应该立即释放互斥量。
1 pthread_mutex_unlock(&mutex);
1 pthread_mutex_destroy(&mutex);
pthread_mutex_init() 这个函数用于初始化互斥量
1 int pthread_mutex_init (pthread_mutex_t *mutex, const pthread_mutexattr_t *attr) ;
参数 :
mutex
:指向要初始化的互斥量。
attr
:指定互斥量属性,如果设为NULL,则使用默认属性。
返回值 :
pthread_mutex_destroy() 这个函数用于销毁一个已经不再需要的互斥量
1 2 int pthread_mutex_destroy (pthread_mutex_t *mutex) ;
参数 :
返回值 :
pthread_mutex_lock() 尝试锁定一个互斥量。如果互斥量已经被其他线程锁定,该调用会阻塞 ,直到互斥量可用为止
1 2 int pthread_mutex_lock (pthread_mutex_t *mutex) ;
参数 :
返回值 :
pthread_mutex_trylock() 尝试锁定一个互斥量。如果互斥量已经被其他线程锁定,该调用会立即返回一个EBUSY
错误,而不是阻塞
1 2 int pthread_mutex_trylock (pthread_mutex_t *mutex) ;
参数 :
返回值 :
pthread_mutex_unlock() 解锁一个互斥量
1 2 int pthread_mutex_unlock (pthread_mutex_t *mutex) ;
参数 :
返回值 :
注意 这些函数使你能够在多线程环境中安全地对共享资源进行操作,从而避免产生竞争条件。但要记住,当你使用互斥量时,应确保在所有的代码路径上,每次锁定都有相应的解锁 ,以避免死锁和其他同步问题
(3)程序实例(互斥量) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 #include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <string.h> #define THRNUM 20 #define FNAME "../out" #define LINESIZE 1024 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;static void *thr_add (void *p) { FILE *fp; char linebuf[LINESIZE]; fp = fopen(FNAME,"r+" ); if (fp ==NULL ) { perror("fopen()" ); exit (1 ); } pthread_mutex_lock(&mutex); fgets(linebuf,LINESIZE,fp); fseek(fp,0 ,SEEK_SET); sleep(1 ); fprintf (fp,"%d\n" ,atoi(linebuf)+1 ); fclose(fp); pthread_mutex_unlock(&mutex); pthread_exit(NULL ); } int main () { int i,err; pthread_t tid[THRNUM]; for (i = 0 ;i<THRNUM;i++) { err = pthread_create(tid+i,NULL ,thr_add,NULL ); if (err) { fprintf (stderr ,"pthread_create():%s\n" ,strerror(err)); exit (1 ); } } for (i = 0 ;i<THRNUM;i++) { pthread_join(tid[i],NULL ); } pthread_mutex_destroy(&mutex); exit (0 ); }
运行结果:
休眠一段事件后,正确的结果
1 2 3 base) zxz@zxz-B660M-GAMING-X-AX-DDR4:~/Proj/CLionProj/UNIX/IO/thread/create3/build$ ./primer0 (base) zxz@zxz-B660M-GAMING-X-AX-DDR4:~/Proj/CLionProj/UNIX/IO/thread/create3/build$ cat ../out 21
6.线程池 视频教程
线程池是多线程编程中的一种设计模式,它用于为待执行的任务提供已经启动的线程,而不是为每个任务启动一个新线程。这种方法的好处是可以减少创建和销毁线程的开销,并限制系统中线程的数量,这样可以预防资源的过度使用和系统的过载
线程池通常由以下几个核心组件组成:
任务队列 :存储待执行的任务。
工作者线程 :线程池中的线程,它们不断从任务队列中取出任务并执行。
线程池管理器 :负责添加、暂停或终止工作者线程
线程池的工作流程通常如下:
初始化线程池并创建一定数量的工作者线程。
当一个新任务到达时,将其添加到任务队列。
工作者线程从任务队列中取出任务并执行。完成后,它会再次检查队列以获取下一个任务。
当任务队列为空,线程可以选择休眠等待,或根据具体的线程池策略进行其他操作
(1)C语言中实现线程池 C语言本身并不直接支持线程池,但你可以使用pthread
库手动实现一个,或使用现有的第三方库,如libthreadpool
如果你决定自己实现线程池,这里有一些建议的步骤:
定义数据结构 :为线程池、工作者线程和任务定义数据结构。
初始化线程池 :根据指定的大小创建工作者线程,并初始化任务队列。
提交任务 :提供一个函数,使用户能够将任务添加到队列。
工作者线程的主函数 :此函数应从任务队列中提取任务并执行它们。
清理和关闭 :提供一个函数来停止所有工作者线程并清理线程池。
虽然从头实现一个线程池是一个很好的学习经验,但如果你要在生产环境中使用线程池,建议使用已经存在且经过良好测试的第三方库,因为线程同步和管理都是比较复杂的主题,容易出错
7.线程令牌桶 在线程令牌桶系统中,和进程不同,不通过alarm
或者setitimer
函数定时并且发送信号,让信号行为函数执行,而是通过让积攒令牌桶系统中各个令牌桶的令牌数的子线程进行休眠固定时间。
创建子线程,对令牌桶中的令牌进行操作(每1s增加CPS)
查询法–存在忙等 该令牌桶系统只有一个子线程负责,为令牌桶数组中的所有令牌桶,每间隔1s,为存在的每个令牌桶积攒令牌数
mytbf.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 #ifndef THREADTOKENBUCKET_MYTBF_H #define THREADTOKENBUCKET_MYTBF_H #define MYTBF_MAX 1024 typedef void mytbf_t ;mytbf_t *mytbf_init (int cps,int burst) ;int mytbf_fetchtoken (mytbf_t *,int ) ;int mytbf_returntoken (mytbf_t *,int ) ;int mytbf_destory (mytbf_t *) ;#endif
mytbf.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <errno.h> #include <pthread.h> #include <string.h> #include "mytbf.h" static struct mytbf_st * job [MYTBF_MAX ]; static pthread_mutex_t mut_job = PTHREAD_MUTEX_INITIALIZER;static pthread_t tid_alarm;static pthread_once_t init_once = PTHREAD_ONCE_INIT;struct mytbf_st { int cps; int burst; int token; int pos; pthread_mutex_t mut; }; static void *thr_alarm (void *p) { while (1 ) { pthread_mutex_lock(&mut_job); for (int i = 0 ; i < MYTBF_MAX; i++) { if (job[i] != NULL ) { pthread_mutex_lock(&job[i]->mut); job[i]->token += job[i]->cps; if (job[i]->token > job[i]->burst) job[i]->token = job[i]->burst; pthread_mutex_unlock(&job[i]->mut); } } pthread_mutex_unlock(&mut_job); sleep(1 ); } } static void module_unload (void ) { pthread_cancel(tid_alarm); pthread_join(tid_alarm,NULL ); pthread_mutex_lock(&mut_job); for (int i = 0 ; i < MYTBF_MAX; i++) { if (job[i] != NULL ) { mytbf_destory(job[i]); } } pthread_mutex_unlock(&mut_job); pthread_mutex_destroy(&mut_job); } static void module_load (void ) { int err; err = pthread_create(&tid_alarm,NULL ,thr_alarm,NULL ); if (err) { fprintf (stderr ,"pthread_create():%s\n" ,strerror(err)); exit (1 ); } atexit(module_unload); } static int min (int a, int b) { if (a < b) return a; return b; } static int get_free_pos_unlocked (void ) { for (int i = 0 ; i < MYTBF_MAX; i++) { if (job[i] == NULL ) return i; } return -1 ; } mytbf_t *mytbf_init (int cps,int burst) { struct mytbf_st *me ; pthread_once(&init_once,module_load); me = malloc (sizeof (*me)); if (me == NULL ) return NULL ; me->token = 0 ; me->cps = cps; me->burst = burst; pthread_mutex_init(&me->mut,NULL ); pthread_mutex_lock(&mut_job); int pos = get_free_pos_unlocked(); if (pos < 0 ) { pthread_mutex_unlock(&mut_job); free (me); return NULL ; } me->pos = pos; job[pos] = me; pthread_mutex_unlock(&mut_job); return me; } int mytbf_fetchtoken (mytbf_t *ptr,int size) { struct mytbf_st *me = ptr; int n; if (size < 0 ) return -EINVAL; pthread_mutex_lock(&me->mut); while (me->token <= 0 ) { pthread_mutex_unlock(&me->mut); sched_yield(); pthread_mutex_lock(&me->mut); } n = min(me->token,size); me->token -= n; pthread_mutex_unlock(&me->mut); return n; } int mytbf_returntoken (mytbf_t *ptr,int size) { struct mytbf_st *me = ptr; if (size < 0 ) return -EINVAL; pthread_mutex_lock(&me->mut); me->token += size; if (me->token > me->burst) me->token = me->burst; pthread_mutex_unlock(&me->mut); return size; } int mytbf_destory (mytbf_t *ptr) { struct mytbf_st *me = ptr; pthread_mutex_lock(&mut_job); job[me->pos] = NULL ; pthread_mutex_unlock(&mut_job); pthread_mutex_destroy(&me->mut); free (ptr); return 0 ; }
main.c
main函数中通过令牌桶实现模拟cat
指令中的流量控制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <errno.h> #include "mytbf.h" #define CPS 10 #define BUFSIZE 1024 #define BURST 100 int main (int argc,char *argv[]) { int sfd,dfd = 1 ; int len = 0 ; int ret = 0 ; int pos = 0 ; int size = 0 ; char buf[BUFSIZE]; mytbf_t *tbf; if (argc < 2 ) { fprintf (stderr ,"Usage:%s <src_file>\n" ,argv[0 ]); exit (1 ); } tbf = mytbf_init(CPS,BURST); if (tbf == NULL ) { fprintf (stderr , "mytbf_init() failed!\n" ); exit (1 ); } do { sfd = open(argv[1 ], O_RDONLY); if (sfd < 0 ) { if (errno != EINTR) { perror("open()" ); exit (1 ); } } }while (sfd < 0 ); while (1 ) { size = mytbf_fetchtoken(tbf, BUFSIZE); if (size < 0 ) { fprintf (stderr , "mytbf_fetchtoken():%s\n" , strerror(-size)); exit (1 ); } while ((len = read(sfd,buf,size)) < 0 ) { if (errno == EINTR) continue ; perror("read()" ); exit (1 ); } if (len == 0 ) break ; if ((size - len) > 0 ) mytbf_returntoken(tbf,size-len); pos = 0 ; while (len > 0 ) { ret = write(dfd, buf + pos, len); if (ret < 0 ) { if (errno == EINTR) continue ; perror("write()" ); exit (1 ); } pos += ret; len -= ret; } } close(sfd); mytbf_destory(tbf); exit (0 ); }
CMakeLists.txt
1 2 3 4 5 6 7 8 9 10 11 12 cmake_minimum_required(VERSION 3.19 ) project(ThreadTokenBucket C) set (CMAKE_C_STANDARD 99 )# 找线程库 find_package(Threads REQUIRED) add_executable(ThreadTokenBucket main.c mytbf.c mytbf.h) # 链接线程库 target_link_libraries(ThreadTokenBucket ${CMAKE_THREAD_LIBS_INIT})
编译运行,发现程序在以每间隔1s
,以CPS字节数量的速度输出想查看文件内容
我是在虚拟机上面运行的程序,处理器数量为2,每个处理器的核心数为2,虚拟的CPU为2x2=4。使用编译的程序,显示一个内容比较多的文件时。使用htop
在另外一个终端,查看ubuntu的cpu占用率,如下图:
系统的第2个虚拟CPU占用率为100%,这就是因为忙等的存在使得CPU占用率过高,接下来将使用条件变量通知法解决这个问题
信号量通知法–处理忙等 mytbf.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 #ifndef THREADCONDTOKENBUCKET_MYTBF_H #define THREADCONDTOKENBUCKET_MYTBF_H #define MYTBF_MAX 1024 typedef void mytbf_t ; mytbf_t *mytbf_init (int cps,int burst) ;int mytbf_fetchtoken (mytbf_t *,int ) ;int mytbf_returntoken (mytbf_t *,int ) ;int mytbf_destory (mytbf_t *) ;#endif
mytbf.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <errno.h> #include <pthread.h> #include <string.h> #include "mytbf.h" static struct mytbf_st * job [MYTBF_MAX ]; static pthread_mutex_t mut_job = PTHREAD_MUTEX_INITIALIZER;static pthread_t tid_alarm;static pthread_once_t init_once = PTHREAD_ONCE_INIT;struct mytbf_st { int cps; int burst; int token; int pos; pthread_mutex_t mut; pthread_cond_t cond; }; static void *thr_alarm (void *p) { while (1 ) { pthread_mutex_lock(&mut_job); for (int i = 0 ; i < MYTBF_MAX; i++) { if (job[i] != NULL ) { pthread_mutex_lock(&job[i]->mut); job[i]->token += job[i]->cps; if (job[i]->token > job[i]->burst) job[i]->token = job[i]->burst; pthread_cond_broadcast(&job[i]->cond); pthread_mutex_unlock(&job[i]->mut); } } pthread_mutex_unlock(&mut_job); sleep(1 ); } } static void module_unload (void ) { pthread_cancel(tid_alarm); pthread_join(tid_alarm,NULL ); pthread_mutex_lock(&mut_job); for (int i = 0 ; i < MYTBF_MAX; i++) { if (job[i] != NULL ) { mytbf_destory(job[i]); } } pthread_mutex_unlock(&mut_job); pthread_mutex_destroy(&mut_job); } static void module_load (void ) { int err; err = pthread_create(&tid_alarm,NULL ,thr_alarm,NULL ); if (err) { fprintf (stderr ,"pthread_create():%s\n" ,strerror(err)); exit (1 ); } atexit(module_unload); } static int min (int a, int b) { if (a < b) return a; return b; } static int get_free_pos_unlocked (void ) { for (int i = 0 ; i < MYTBF_MAX; i++) { if (job[i] == NULL ) return i; } return -1 ; } mytbf_t *mytbf_init (int cps,int burst) { struct mytbf_st *me ; pthread_once(&init_once,module_load); me = malloc (sizeof (*me)); if (me == NULL ) return NULL ; me->token = 0 ; me->cps = cps; me->burst = burst; pthread_mutex_init(&me->mut,NULL ); pthread_cond_init(&me->cond,NULL ); pthread_mutex_lock(&mut_job); int pos = get_free_pos_unlocked(); if (pos < 0 ) { pthread_mutex_unlock(&mut_job); free (me); return NULL ; } me->pos = pos; job[pos] = me; pthread_mutex_unlock(&mut_job); return me; } int mytbf_fetchtoken (mytbf_t *ptr,int size) { struct mytbf_st *me = ptr; int n; if (size < 0 ) return -EINVAL; pthread_mutex_lock(&me->mut); while (me->token <= 0 ) { pthread_cond_wait(&me->cond,&me->mut); } n = min(me->token,size); me->token -= n; pthread_mutex_unlock(&me->mut); return n; } int mytbf_returntoken (mytbf_t *ptr,int size) { struct mytbf_st *me = ptr; if (size < 0 ) return -EINVAL; pthread_mutex_lock(&me->mut); me->token += size; if (me->token > me->burst) me->token = me->burst; pthread_cond_broadcast(&me->cond); pthread_mutex_unlock(&me->mut); return size; } int mytbf_destory (mytbf_t *ptr) { struct mytbf_st *me = ptr; pthread_mutex_lock(&mut_job); job[me->pos] = NULL ; pthread_mutex_unlock(&mut_job); pthread_mutex_destroy(&me->mut); pthread_cond_destroy(&me->cond); free (ptr); return 0 ; }
main.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <errno.h> #include "mytbf.h" #define CPS 10 #define BUFSIZE 1024 #define BURST 100 int main (int argc, char **argv) { int sfd, dfd = 1 ; int len = 0 ; int ret = 0 ; int pos = 0 ; int size = 0 ; char buf[BUFSIZE]; mytbf_t *tbf; if (argc < 2 ) { fprintf (stderr , "Usage:%s <src_file>\n" ,argv[0 ]); exit (1 ); } tbf = mytbf_init(CPS, BURST); if (tbf == NULL ) { fprintf (stderr , "mytbf_init() failed!\n" ); exit (1 ); } do { sfd = open(argv[1 ], O_RDONLY); if (sfd < 0 ) { if (errno != EINTR) { perror("open()" ); exit (1 ); } } }while (sfd < 0 ); while (1 ) { size = mytbf_fetchtoken(tbf, BUFSIZE); if (size < 0 ) { fprintf (stderr , "mytbf_fetchtoken():%s\n" , strerror(-size)); exit (1 ); } while ((len = read(sfd, buf, size)) < 0 ) { if (errno == EINTR) continue ; perror("read()" ); break ; } if (len == 0 ) break ; if (size - len > 0 ) mytbf_returntoken(tbf, size - len); pos = 0 ; while (len > 0 ) { ret = write(dfd, buf + pos, len); if (ret < 0 ) { if (errno == EINTR) continue ; perror("write()" ); exit (1 ); } pos += ret; len -= ret; } } close(sfd); mytbf_destory(tbf); exit (0 ); }
CMakeLists.txt
1 2 3 4 5 6 7 8 9 10 11 12 cmake_minimum_required(VERSION 3.19 ) project(ThreadTokenBucket C) set (CMAKE_C_STANDARD 99 )# 找线程库 find_package(Threads REQUIRED) add_executable(ThreadCondTokenBucket main.c mytbf.c mytbf.h) # 链接线程库 target_link_libraries(ThreadCondTokenBucket ${CMAKE_THREAD_LIBS_INIT})
编译运行,发现程序在以每间隔1s
,以CPS字节数量的速度输出想查看文件内容
我是在虚拟机上面运行的程序,处理器数量为2,每个处理器的核心数为2,虚拟的CPU为2x2=4。使用编译的程序,显示一个内容比较多的文件时。使用htop
在另外一个终端,查看ubuntu的cpu占用率,如下图:
发现此时相比于之前的查询方法,使用信号变量的通知法,解决了忙等的问题,第二个CPU的占用率基本没有
8.条件变量 条件变量是多线程编程中用于线程同步的另一种机制。它允许线程阻塞等待,直到某个特定条件成为真。这是一种有用的机制,特别是在涉及生产者-消费者问题或其他需要多个线程协作的场景中
在 POSIX
线程(pthreads
)中,条件变量的类型是 pthread_cond_t
(1)相关函数 pthread_cond_init()
pthread_cond_init
函数用于初始化一个条件变量
1 int pthread_cond_init (pthread_cond_t *cond, const pthread_condattr_t *attr) ;
参数:
cond : 指向要初始化的条件变量的指针。
attr : 指向条件变量属性对象的指针。如果你想使用默认的属性,可以设置为NULL。
返回值:
pthread_cond_destroy()
pthread_cond_destroy
函数用于销毁一个条件变量,并释放与之相关的任何资源。销毁一个已被其他线程阻塞(等待)的条件变量将导致未定义的行为,因此在销毁条件变量之前,确保没有线程正在等待该条件变量是很重要的
1 int pthread_cond_destroy (pthread_cond_t *cond) ;
参数:
返回值:
pthread_cond_wait()
pthread_cond_wait
函数在多线程编程中用于阻塞当前线程(将这个线程挂起),直到另一个线程发出信号通知这个条件变量(直到某个条件变量被特定地通知或广播),一旦条件变量被通知,线程被唤醒。通常与一个互斥量(mutex)一起使用,以确保数据的同步访问
线程被挂起,就会处于非运行状态,不会占用CPU资源
1 int pthread_cond_wait (pthread_cond_t *cond, pthread_mutex_t *mutex) ;
参数:
cond : 指向条件变量的指针。
mutex : 在调用pthread_cond_wait
之前必须被锁定的互斥量。pthread_cond_wait
会自动释放此互斥量,允许其他线程操作,然后使当前线程进入等待状态。直到条件变量被其他线程发出信号或广播。一旦收到信号或广播,pthread_cond_wait
再次锁定互斥量(再次将互斥锁上锁)并返回。
返回值:
pthread_cond_signal()
pthread_cond_signal
函数用于从一个或多个等待给定条件变量的线程中唤醒一个线程。它是条件变量操作的核心,使得多线程间可以进行精确的同步操作
1 int pthread_cond_signal (pthread_cond_t *cond) ;
参数:
返回值:
pthread_cond_broadcast()
pthread_cond_broadcast
函数用于唤醒所有正在给定条件变量上等待的线程。与pthread_cond_signal
不同,pthread_cond_signal
只唤醒一个等待的线程,而pthread_cond_broadcast
则唤醒所有等待的线程
1 int pthread_cond_broadcast (pthread_cond_t *cond) ;
参数:
返回值:
(2)程序实例 producer
线程添加元素到缓冲区,而consumer
线程从缓冲区中取出元素。条件变量确保当缓冲区满时,生产者会等待,而当缓冲区为空时,消费者会等待
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 #include <stdio.h> #include <pthread.h> #define BUFFER_SIZE 10 static int buffer[BUFFER_SIZE];static int count = 0 ;static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;static void * producer (void * arg) { for (int i = 0 ; i < 100 ; i++) { pthread_mutex_lock(&mutex); while (count == BUFFER_SIZE) { pthread_cond_wait(&cond, &mutex); } buffer[count++] = i; pthread_cond_signal(&cond); pthread_mutex_unlock(&mutex); } pthread_exit(NULL ); } static void * consumer (void * arg) { for (int i = 0 ; i < 100 ; i++) { pthread_mutex_lock(&mutex); while (count == 0 ) { pthread_cond_wait(&cond, &mutex); } printf ("Consumer: %d\n" , buffer[--count]); pthread_cond_signal(&cond); pthread_mutex_unlock(&mutex); } pthread_exit(NULL ); } int main () { pthread_t prod, cons; pthread_create(&prod, NULL , producer, NULL ); pthread_create(&cons, NULL , consumer, NULL ); pthread_join(prod, NULL ); pthread_join(cons, NULL ); return 0 ; }
9.线程-信号量 信号量(semaphores)是一个同步原语,经常用于限制对资源的并发访问(可以解决资源有上限的资源共享问题 ),或者作为多个线程或进程间的同步手段。在线程编程中,信号量可以用来解决读者-写者问题、生产者-消费者问题等
在POSIX
线程(pthreads
)中,信号量由sem_t
类型表示,并通过一系列的函数进行操作 :
sem_init() - 初始化一个未命名的信号量
1 int sem_init (sem_t *sem, int pshared, unsigned int value) ;
sem_wait() - 减少信号量的值。如果信号量的值大于0
,则减少其值并立即返回。如果信号量的值为0
,则线程会被阻塞,直到信号量值变为大于0
1 int sem_wait (sem_t *sem) ;
sem_trywait() - 尝试减少信号量的值。如果信号量的值为0,则立即返回错误,否则减少其值
1 int sem_trywait (sem_t *sem) ;
sem_post() - 增加信号量的值。增加后,如果有其他线程因为sem_wait
被阻塞在这个信号量上,那么其中一个线程会被唤醒
1 int sem_post (sem_t *sem) ;
sem_destroy() - 销毁一个未命名的信号量,释放其相关资源
1 int sem_destroy (sem_t *sem) ;
还有其他函数,如sem_getvalue()
(获取当前信号量值)和命名信号量相关的函数(如sem_open()
, sem_close()
, sem_unlink()
等)
(1)相关函数 sem_init() 初始化一个未命名的信号量
1 int sem_init (sem_t *sem, int pshared, unsigned int value) ;
参数:
sem :这是指向要初始化的信号量对象的指针
pshared :这个参数决定了信号量是在同一进程的线程间共享(设置为0),还是在多个进程间共享(设置为非0值)。大多数情况下,我们只在一个进程的线程之间使用信号量,因此将其设置为0
value :这是信号量的初始值。例如,如果你想使用信号量作为一个互斥锁,你可以将初始值设置为1
返回值:
0 :表示成功。
-1 :表示出错,此时错误码存放在errno
中
sem_wait() 用于降低信号量的值。如果信号量的当前值大于0,它会被减少(通常是减少1)并且函数立即返回。如果信号量的当前值为0,调用此函数的线程将被阻塞,直到信号量值变为大于0
1 int sem_wait (sem_t *sem) ;
参数:
返回值:
0 :表示成功。
-1 :表示出错,此时错误码存放在errno
中
sem_trywait() 这个函数尝试降低信号量的值,但与 sem_wait()
不同的是,如果信号量当前的值为0,它不会阻塞调用线程,而是立即返回一个错误
1 int sem_trywait (sem_t *sem) ;
参数:
返回值:
0 :表示成功,信号量的值被成功减少。
-1 :表示无法降低信号量的值(例如,当它已经为0时)或其他错误发生。此时,错误码存放在errno
中。常见的错误码是EAGAIN
,表示信号量的值当前为0
sem_post() 它的作用是增加信号量的值。当信号量的值增加时,如果有线程因调用 sem_wait()
或 sem_trywait()
而阻塞在这个信号量上,那么其中一个线程将被唤醒并继续执行
1 int sem_post (sem_t *sem) ;
参数:
返回值:
0 :表示成功。
-1 :表示出错,此时错误码存放在 errno
中
(2)实例程序 总资源数为1 如果生产者和消费者线程使用的信号量对应的总资源数为1 ,那么不管线程有多少个,可以工作的线程只有一个,其余线程由于拿不到资源,都被迫阻塞了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <semaphore.h> #include <pthread.h> struct Node { int number; struct Node * next ; }; sem_t psem;sem_t csem;pthread_mutex_t mutex;
main.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <pthread.h> #include <semaphore.h> sem_t semp;sem_t semc;pthread_mutex_t mutex;struct Node { int number; struct Node * next ; }; struct Node *head = NULL ;void *producer (void *arg) { while (1 ) { sem_wait(&semp); struct Node *newnode = malloc (sizeof (struct Node)); newnode->number = rand() % 1000 ; newnode->next = head; head = newnode; printf ("生产者,id:%ld , number:%d\n" , (pthread_self() % 100 ), newnode->number); sem_post(&semc); sleep(rand() % 3 ); } return NULL ; } void *consumer (void *arg) { while (1 ) { sem_wait(&semc); struct Node *node = head; printf ("消费者,id:%ld , number:%d\n" , (pthread_self() % 100 ), node->number); head = head->next; free (node); sem_post(&semp); sleep(rand() % 3 ); } return NULL ; } int main () { pthread_mutex_init(&mutex, NULL ); sem_init(&semp, 0 , 1 ); sem_init(&semc, 0 , 0 ); pthread_t t1[5 ], t2[5 ]; for (int i = 0 ; i < 5 ; i++) { pthread_create(&t1[i], NULL , producer, NULL ); } for (int i = 0 ; i < 5 ; i++) { pthread_create(&t2[i], NULL , consumer, NULL ); } for (int i = 0 ; i < 5 ; i++) { pthread_join(t1[i], NULL ); pthread_join(t2[i], NULL ); } pthread_mutex_destroy(&mutex); sem_destroy(&semp); sem_destroy(&semc); return 0 ; }
通过测试代码可以得到如下结论:如果生产者和消费者使用的信号量总资源数为1,那么不会出现生产者线程和消费者线程同时访问共享资源的情况,不管生产者和消费者线程有多少个,它们都是顺序执行的。
运行结果
可以看到生产者与消费者交替运行
总资源数大于1 如果生产者和消费者线程使用的信号量对应的总资源数为大于1,这种场景下出现的情况就比较多了:
多个生产者线程同时生产
多个消费者同时消费
生产者线程和消费者线程同时生产和消费
以上不管哪一种情况都可能会出现多个线程访问共享资源的情况,如果想防止共享资源出现数据混乱,那么就需要使用互斥锁进行线程同步,线程线性执行任务 ,处理代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <semaphore.h> #include <pthread.h> struct Node { int number; struct Node * next ; }; sem_t psem;sem_t csem;pthread_mutex_t mutex;
在编写上述代码的时候还有一个需要注意是事项,不管是消费者线程的处理函数还是生产者线程的处理函数内部有这么两行代码:
1 2 3 4 5 6 7 sem_wait(&csem); pthread_mutex_lock(&mutex); sem_wait(&csem); pthread_mutex_lock(&mutex);
这两行代码的调用顺序是不能颠倒的,如果颠倒过来就有可能会造成死锁 ,下面来分析一种死锁的场景:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 void * producer (void * arg) { while (1 ) { pthread_mutex_lock(&mutex); sem_wait(&psem); ...... ...... sem_post(&csem); pthread_mutex_unlock(&mutex); sleep(rand() % 3 ); } return NULL ; } void * consumer (void * arg)
在上面的代码中,初始化状态下消费者线程没有任务信号量资源,假设某一个消费者线程先运行,调用pthread_mutex_lock(&mutex);
对互斥锁加锁成功,然后调用sem_wait(&csem);
由于没有资源,因此被阻塞了。其余的消费者线程由于没有抢到互斥锁,因此被阻塞在互斥锁上。对应生产者线程第一步操作也是调用pthread_mutex_lock(&mutex);
,但是这时候互斥锁已经被消费者线程锁上了,所有生产者都被阻塞,到此为止,多余的线程都被阻塞了,程序产生了死锁。
完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <pthread.h> #include <semaphore.h> sem_t semp;sem_t semc;pthread_mutex_t mutex;struct Node { int number; struct Node * next ; }; struct Node *head = NULL ;void *producer (void *arg) { while (1 ) { sem_wait(&semp); pthread_mutex_lock(&mutex); struct Node *newnode = malloc (sizeof (struct Node)); newnode->number = rand() % 1000 ; newnode->next = head; head = newnode; printf ("生产者,id:%ld , number:%d\n" , (pthread_self() % 100 ), newnode->number); pthread_mutex_unlock(&mutex); sem_post(&semc); sleep(rand() % 3 ); } pthread_exit(NULL ); } void *consumer (void *arg) { while (1 ) { sem_wait(&semc); pthread_mutex_lock(&mutex); struct Node *node = head; printf ("消费者,id:%ld , number:%d\n" , (pthread_self() % 100 ), node->number); head = head->next; free (node); pthread_mutex_unlock(&mutex); sem_post(&semp); sleep(rand() % 3 ); } pthread_exit(NULL ); } int main () { pthread_mutex_init(&mutex, NULL ); sem_init(&semp, 0 , 5 ); sem_init(&semc, 0 , 0 ); pthread_t t1[5 ], t2[5 ]; for (int i = 0 ; i < 5 ; i++) { pthread_create(&t1[i], NULL , producer, NULL ); } for (int i = 0 ; i < 5 ; i++) { pthread_create(&t2[i], NULL , consumer, NULL ); } for (int i = 0 ; i < 5 ; i++) { pthread_join(t1[i], NULL ); pthread_join(t2[i], NULL ); } pthread_mutex_destroy(&mutex); sem_destroy(&semp); sem_destroy(&semc); return 0 ; }