多线程网络通信

本文的内容均转载于“爱编程大丙“博客,有小部分个人理解,本文仅用于记录个人的学习过程

视频链接

文档链接

一、相关概念

1.局域网与广域网

  • 局域网:局域网将一定区域内的各种计算机、外部设备和数据库连接起来形成计算机通信的私有网络。
  • 广域网:又称广域网、外网、公网。是连接不同地区局域网或城域网计算机通信的远程公共网络

IP用于在网络中定位某一台主机,而端口则用于定位主机中的某一个进程

2.IP

IP(Internet Protocol):本质是一个整形数,用于表示计算机在网络中的地址。IP协议版本有两个:IPv4和IPv6

  • IPv4(Internet Protocol version4):

    • 使用一个32位的整形数描述一个IP地址,4个字节,int型

    • 也可以使用一个点分十进制字符串描述这个IP地址(每个字节为8位二进制,则一个字节位置可以使用0-2^8-1表示即0-255): 192.168.247.135

    • 分成了4份,每份1字节,8bit(char),最大值为 255

      0.0.0.0 是最小的IP地址

      255.255.255.255是最大的IP地址

    • 按照IPv4协议计算,可以使用的IP地址共有 2^32 个

  • IPv6(Internet Protocol version6):

    • 使用一个128位的整形数描述一个IP地址,16个字节
    • 也可以使用一个字符串描述这个IP地址(每个点分位,占用两个字节位置,即16位2进制。4位2进制为一个16进制度位,因此两个字节位等价于4位16进制位。因此每个点分位的数字范围表示为0-2^16-1即0-65535,用十六进制表示为0000至ffff,转换为十进制也为0至65535):2001:0db8:3c4d:0015:0000:0000:1a2f:1a2b
    • 分成了8份,每份2字节,每一部分以16进制的方式表示
    • 按照IPv6协议计算,可以使用的IP地址共有 2^128 个
  • 查看IP地址

    1
    2
    3
    4
    5
    # linux
    $ ifconfig

    # 特殊的IP地址: 127.0.0.1 ==> 和本地的IP地址是等价的
    # 假设当前电脑没有联网, 就没有IP地址, 又要做网络测试, 可用使用 127.0.0.1 进行本地测试

    在ubuntu下使用以上命令查看:

    image-20240124105826116

    • enp5s0:表示当前操作系统,的以太网接口(有线网卡),通常用于有线网络连接。
    • wlo1:表示当前操作系统,的无线网络接口(无线网卡),通常用于Wi-Fi连接。
    • lo:该网络接口,表示的是“loopback”接口。这是一个特殊的网络接口,用于系统内部的网络通信。lo 通常用于测试和系统进程之间的通信。它是一个虚拟网络接口,不对应于任何物理硬件。Loopback 接口通常配置为 IP 地址 127.0.0.1,在 IPv6 中为 ::1。这个地址被称为本地主机地址。任何发送到这个地址的数据都不会离开主机,而是直接返回。

3.端口

端口的作用是定位到主机上的某一个进程,通过这个端口进程就可以接受到对应的网络数据了。

一个程序运行起来就是一个进程,若这个进程需要接收对应的数据,那么需要绑定IP与端口

1
2
3
4
5
比如: 在电脑上运行了微信和QQ, 小明通过客户端给我的的微信发消息, 电脑上的微信就收到了消息, 为什么?

运行在电脑上的微信和QQ都绑定了不同的端口
通过IP地址可以定位到某一台主机,通过端口就可以定位到主机上的某一个进程
通过指定的IP和端口,发送数据的时候对端就能接受到数据了

端口也是一个整形数 unsigned short ,一个16位整形数,有效端口的取值范围是:0 ~ 65535(0 ~ 216-1)

提问:计算机中所有的进程都需要关联一个端口吗,一个端口可以被重复使用吗?

  • 不需要,如果这个进程不需要网络通信,那么这个进程就不需要绑定端口的

  • 一个端口只能给某一个进程使用,多个进程不能同时使用同一个端口

4.OSI/ISO网络分层协议

OSI(Open System Interconnect),即开放式系统互联。 一般都叫OSI参考模型,是ISO(国际标准化组织组织)在1985年研究的网络互联模型。

image-20240124112249850

每一层的作用,文档链接视频链接

5.网络协议

网络协议指的是计算机网络中互相通信的对等实体之间交换信息时所必须遵守的规则的集合。一般系统网络协议包括五个部分:通信环境,传输服务,词汇表,信息的编码格式,时序、规则和过程。先来通过下面几幅图了解一下常用的网络协议的格式:

网络协议示意图:文档链接

  • 数据的封装

image-20240124112942862

在网络通信的时候, 程序猿需要负责的应用层数据的处理(最上层)

  • 应用层的数据可以使用某些协议进行封装, 也可以不封装

  • 程序猿需要调用发送数据的接口函数,将数据发送出去

  • 程序猿调用的API做底层数据处理

    • 传输层使用传输层协议打包数据
    • 网络层使用网络层协议打包数据
    • 网络接口层使用网络接口层协议打包数据
    • 数据被发送到internet
  • 接收端接收到发送端的数据

    • 程序猿调用接收数据的函数接收数据

    • 调用的API做相关的底层处理:

      网络接口层拆包 ==> 网络层的包
      网络层拆包 ==> 网络层的包
      传输层拆包 ==> 传输层数据

  • 如果应用层也使用了协议对数据进行了封装,数据的包的解析需要程序猿做

二、Socket网络编程

1.字节序

字节序,顾名思义字节的顺序,就是大于一个字节类型的数据在内存中的存放顺序,也就是说对于单字符来说是没有字节序问题的,字符串是单字符的集合,因此字符串也没有字节序问题

目前在各种体系的计算机中通常采用的字节存储机制主要有两种:Big-Endian 和 Little-Endian,下面先从字节序说起

  • Little-Endian -> 主机字节序 (小端)

    • 数据的低位字节存储到内存的低地址位, 数据的高位字节存储到内存的高地址位
    • 我们使用的PC机,数据的存储默认使用的是小端
  • Big-Endian -> 网络字节序 (大端)

    • 数据的低位字节存储到内存的高地址位, 数据的高位字节存储到内存的低地址位
    • 套接字通信过程中操作的数据都是大端存储的,包括:接收/发送的数据、IP地址、端口
  • 字节序举例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    // 实例1:
    数据:0x12345678 // 0x的字符序列表示一个十六进制数
    如上的数据则是 816进制位,一个16进制位表示4个二进制位,而一个字节由8位二进制组成,因此上面的数据则表示的是4个字节
    0x 12 34 56 78 // 而数据中 字节位高低 从右往左逐渐降低(类似比较:一个整数123,3是个位,2是十位,1是百位)
    内存中地址的高低 从左往右 逐渐增高,因此地址中左边是内存中的低地址位,右边是内存中的高地址位
    因此数据 0x12 34 56 78
    大端存储:12 34 56 78
    小端存储:78 56 34 12


    // 实例2:
    // 有一个16进制的数, 有32位 (int): 0xab5c01ff
    // 字节序, 最小的单位: char 字节, int 有4个字节, 需要将其拆分为4份
    // 一个字节 unsigned char, 最大值是 255(十进制) ==> ff(16进制)
    内存低地址位 内存的高地址位
    --------------------------------------------------------------------------->
    小端: 0xff 0x01 0x5c 0xab
    大端: 0xab 0x5c 0x01 0xff

相关函数:

BSD Socket提供了封装好的转换接口,方便程序员使用。包括从主机字节序到网络字节序的转换函数:htonshtonl;从网络字节序到主机字节序的转换函数:ntohsntohl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#include <arpa/inet.h>
// u:unsigned
// 16: 16位, 32:32位
// h: host, 主机字节序
// n: net, 网络字节序
// s: short
// l: int

// 这套api主要用于 网络通信过程中 IP 和 端口 的 转换
// 将一个短整形从主机字节序 -> 网络字节序
uint16_t htons(uint16_t hostshort);
// 将一个整形从主机字节序 -> 网络字节序
uint32_t htonl(uint32_t hostlong);

// 将一个短整形从网络字节序 -> 主机字节序
uint16_t ntohs(uint16_t netshort)
// 将一个整形从网络字节序 -> 主机字节序
uint32_t ntohl(uint32_t netlong);

2.IP地址转换

IP地址本质是一个整形数,但是在使用的过程中都是通过一个字符串来描述,下面的函数描述了如何将一个字符串类型的IP地址进行大小端转换:

1
2
3
4
// 主机字节序的IP地址转换为网络字节序
// 主机字节序的IP地址是字符串, 网络字节序IP地址是整形
// 函数中的 n 表示大整型的意思,p表示的是字符型点分式的IP地址,因此就是从字符型点分式的IP地址格式向大整数的转换
int inet_pton(int af, const char *src, void *dst);

参数:

  • af: 地址族(IP地址的家族包括ipv4和ipv6)协议
    • AF_INET: ipv4格式的ip地址
    • AF_INET6: ipv6格式的ip地址
  • src: 传入参数, 对应要转换的点分十进制的ip地址: 192.168.1.100
  • dst: 传出参数, 函数调用完成, 转换得到的大端整形IP被写入到这块内存中

返回值:

  • 成功返回1
  • 失败返回0或者-1
1
2
3
#include <arpa/inet.h>
// 将大端的整形数, 转换为小端的点分十进制的IP地址
const char *inet_ntop(int af, const void *src, char *dst, socklen_t size);

参数:

  • af: 地址族协议
    • AF_INET: ipv4格式的ip地址
    • AF_INET6: ipv6格式的ip地址
  • src: 传入参数, 这个指针指向的内存中存储了大端的整形IP地址
  • dst: 传出参数, 存储转换得到的小端的点分十进制的IP地址
  • size: 修饰dst参数的, 标记dst指向的内存中最多可以存储多少个字节

返回值:

  • 成功: 指针指向第三个参数对应的内存地址, 通过返回值也可以直接取出转换得到的IP字符串
  • 失败: NULL

还有一组函数也能进程IP地址大小端的转换,但是只能处理ipv4的ip地址:

1
2
3
4
5
// 点分十进制IP -> 大端整形
in_addr_t inet_addr (const char *cp);

// 大端整形 -> 点分十进制IP
char* inet_ntoa(struct in_addr in);

3.sockaddr数据结构

img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 在写数据的时候不好用
struct sockaddr {
sa_family_t sa_family; // 地址族协议, ipv4 16位地址类型
char sa_data[14]; // 端口(2字节) + IP地址(4字节) + 填充(8字节)
}

typedef unsigned short uint16_t;
typedef unsigned int uint32_t;
typedef uint16_t in_port_t;
typedef uint32_t in_addr_t;
typedef unsigned short int sa_family_t;
#define __SOCKADDR_COMMON_SIZE (sizeof (unsigned short int))

struct in_addr
{
in_addr_t s_addr; // 32位无符号的整数
};

// sizeof(struct sockaddr) == sizeof(struct sockaddr_in)
struct sockaddr_in
{
sa_family_t sin_family; /* 地址族协议: AF_INET */
in_port_t sin_port; /* 端口, 2字节-> 大端 ,端口需要进行字节序的转换*/
struct in_addr sin_addr; /* IP地址, 4字节 -> 大端 */
/* 填充 8字节 */
unsigned char sin_zero[sizeof (struct sockaddr) - sizeof(sin_family) -
sizeof (in_port_t) - sizeof (struct in_addr)];
};

4.套接字函数

(1)socket函数

使用套接字通信函数需要包含头文件<arpa/inet.h>,包含了这个头文件<sys/socket.h>就不用在包含了

1
2
// 创建一个套接字
int socket(int domain, int type, int protocol);

参数:

  • domain: 使用的地址族协议
    • AF_INET: 使用IPv4格式的ip地址
    • AF_INET6: 使用IPv4格式的ip地址
  • type:
    • SOCK_STREAM: 使用流式的传输协议
    • SOCK_DGRAM: 使用报式(报文)的传输协议
  • protocol: 一般写0即可, 使用默认的协议
    • SOCK_STREAM: 流式传输默认使用的是tcp
    • SOCK_DGRAM: 报式传输默认使用的udp

返回值:

  • 成功: 可用于套接字通信的文件描述符
  • 失败: -1

函数的返回值是一个文件描述符,通过这个文件描述符可以操作内核中的某一块内存,网络通信是基于这个文件描述符来完成的。

(2)bind函数
1
2
// 将文件描述符和本地的IP与端口进行绑定   
int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);

参数:

  • sockfd: 监听的文件描述符, 通过socket()调用得到的返回值
  • addr: 传入参数, 要绑定的IP和端口信息需要初始化到这个结构体中,IP和端口要转换为网络字节序
  • addrlen: 参数addr指向的内存大小, sizeof(struct sockaddr)

返回值:

  • 成功返回0
  • 失败返回-1
(3)listen函数
1
2
// 给监听的套接字设置监听
int listen(int sockfd, int backlog);

参数:

  • sockfd: 文件描述符, 可以通过调用socket()得到,在监听之前必须要绑定 bind()
  • backlog: 同时能处理的最大连接要求,最大值为128

返回值:

  • 函数调用成功返回0
  • 调用失败返回 -1
(4)accept函数

这个函数是一个阻塞函数,当没有新的客户端连接请求的时候,该函数阻塞;当检测到有新的客户端连接请求时,阻塞解除,新连接就建立了,得到的返回值也是一个文件描述符,基于这个文件描述符就可以和客户端通信了。

该函数是一个线程安全的函数

1
2
// 等待并接受客户端的连接请求, 建立新的连接, 会得到一个新的文件描述符(通信的)		
int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);

参数:

  • sockfd: 监听的文件描述符
  • addr: 传出参数, 里边存储了建立连接的客户端的地址信息
  • addrlen: 传入传出参数,用于存储addr指向的内存大小

返回值:

  • 函数调用成功,得到一个文件描述符, 用于和建立连接的这个客户端通信
  • 调用失败返回 -1
(5)read与recv函数

如果连接没有断开,接收端接收不到数据,接收数据的函数会阻塞等待数据到达,数据到达后函数解除阻塞,开始接收数据,当发送端断开连接,接收端无法接收到任何数据,但是这时候就不会阻塞了,函数直接返回0。

1
2
3
// 接收数据
ssize_t read(int sockfd, void *buf, size_t size);
ssize_t recv(int sockfd, void *buf, size_t size, int flags);

参数:

  • sockfd: 用于通信的文件描述符, accept() 函数的返回值
  • buf: 指向一块有效内存, 用于存储接收是数据
  • size: 参数buf指向的内存的容量
  • flags: 特殊的属性, 一般不使用, 指定为 0

返回值:

  • 大于0:实际接收的字节数
  • 等于0:对方断开了连接
  • -1:接收数据失败了
(6)write与send函数
1
2
3
// 发送数据的函数
ssize_t write(int fd, const void *buf, size_t len);
ssize_t send(int fd, const void *buf, size_t len, int flags);

参数:

  • fd: 通信的文件描述符, accept() 函数的返回值
  • buf: 传入参数, 要发送的字符串
  • len: 要发送的字符串的长度
  • flags: 特殊的属性, 一般不使用, 指定为 0

返回值:

  • 大于0:实际发送的字节数,和参数len是相等的
  • -1:发送数据失败了
(7)connect函数
1
2
3
// 成功连接服务器之后, 客户端会自动随机绑定一个端口
// 服务器端调用accept()的函数, 第二个参数存储的就是客户端的IP和端口信息
int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);

参数:

  • sockfd: 通信的文件描述符, 通过调用socket()函数就得到了
  • addr: 存储了要连接的服务器端的地址信息: iP 和 端口,这个IP和端口也需要转换为大端然后再赋值
  • addrlen: addr指针指向的内存的大小 sizeof(struct sockaddr)

返回值:

  • 连接成功返回0
  • 连接失败返回-1

三、TCP通信流程

TCP是一个面向连接的,安全的,流式传输协议,这个协议是一个传输层协议

  • 面向连接:是一个双向连接,通过三次握手完成,断开连接需要通过四次挥手完成
  • 安全:tcp通信过程中,会对发送的每一数据包都会进行校验, 如果发现数据丢失, 会自动重传
  • 流式传输:发送端和接收端处理数据的速度,数据的量都可以不一致

img

1.服务端通信流程

  • 1.创建用于监听的套接字, 这个套接字是一个文件描述符
1
int lfd = socket();
  • 2.将得到的监听的文件描述符和本地的IP 端口进行绑定(服务器程序启动之后就是一个进程,若该进程需要接收对应的数据,则需要绑定IP与端口,通过IP可以在网络中找到对应的主机,通过端口则可以找到主机中对应的服务进程

    服务端作为被动端需要对IP与端口进行绑定,这样主动端可以通过被动端的IP以及端口对其进行定位

1
bind();
  • 3.设置监听(成功之后开始监听, 监听的是客户端的连接)
1
listen();
  • 4.等待并接受客户端的连接请求, 建立新的连接, 会得到一个新的文件描述符(通信的),没有新连接请求就阻塞,accept是一个阻塞的系统调用
1
int cfd = accept();
  • 5.通信,读写操作默认都是阻塞的
1
2
3
4
// 接收数据
read(); / recv();
// 发送数据
write(); / send();
  • 6.断开连接, 关闭套接字
1
close(); // 调用close函数则会进行两次挥手,若服务端调用该函数则会挥手两次,客户端调用该函数则也会挥手两次,总共四次

在tcp的服务器端, 有两类文件描述符

  • 监听的文件描述符
    • 只需要有一个
    • 不负责和客户端通信, 负责检测客户端的连接请求, 检测到之后调用accept就可以建立新的连接
  • 通信的文件描述符
    • 负责和建立连接的客户端通信
    • 如果有N个客户端和服务器建立了新的连接, 通信的文件描述符就有N个,每个客户端和服务器都对应一个通信的文件描述符
(1)文件描述符的重要性

1558084711685

上图

左方为服务端情形,在服务端只有一个用于监听的文件描述符,而通信的文件描述符可以有N(若干个)

左方位客户端情形,在客户端只有一个用于通信的文件描述符

通过上述的文件描述符就可以进行网络IO的操作,即可以进行网络数据的读写操作。网络IO读写操作,操作的主体是内核中的内存。而文件IO操作的主体是磁盘中的内存。

进行套接字通信时,每一个文件描述符对应的是内核中的两块内存:读缓冲区以及写缓冲区

网络通信文件描述符对应的内存结构

  • 一个文件的文件描述符对应两块内存, 一块内存是读缓冲区, 一块内存是写缓冲区
  • 读数据: 通过文件描述符将内存中的数据读出, 这块内存称之为读缓冲区
  • 写数据: 通过文件描述符将数据写入到某块内存中, 这块内存称之为写缓冲区

监听的文件描述符

  • 客户端的连接请求会发送到服务器端监听的文件描述符的读缓冲区中
  • 读缓冲区中有数据, 说明有新的客户端连接
  • 调用accept()函数, 这个函数会检测监听文件描述符的读缓冲区
    • 检测不到数据, 该函数阻塞
    • 如果检测到数据, 解除阻塞, 新的连接建立

通信的文件描述符

  • 客户端和服务器端都有通信的文件描述符
  • 发送数据:调用函数 write() / send(),数据进入到内核中
    • 数据并没有被发送出去, 而是将数据写入到了通信的文件描述符对应的写缓冲区中
    • 内核检测到通信的文件描述符写缓冲区中有数据, 内核会将数据发送到网络中
  • 接收数据: 调用的函数 read() / recv(), 从内核读数据
    • 数据如何进入到内核程序猿不需要处理, 数据进入到通信的文件描述符的读缓冲区中
    • 数据进入到内核, 必须使用通信的文件描述符, 将数据从读缓冲区中读出即可

2.客户端的通信流程

主动请求连接的一方,不需要对IP以及端口进行绑定,因为connect函数在向服务端发送连接时,会将本机的ip以及本机随机一个空闲的端口(系统自动分配)发送给服务端。

在单线程的情况下客户端通信的文件描述符有一个, 没有监听的文件描述符

  • 1.创建一个通信的套接字
1
int cfd = socket();
  • 2.连接服务器, 需要知道服务器绑定的IP和端口
1
connect();
  • 3.通信
1
2
3
4
// 接收数据
read(); / recv();
// 发送数据
write(); / send();
  • 4.断开连接, 关闭文件描述符(套接字)
1
close();

3.Web Server单线程实例

server.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
// server.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>

#define SERVERPORT "1989"
#define IPSTRSIZE 40

int main()
{
// 1. 创建监听的套接字
int lfd = socket(AF_INET, SOCK_STREAM, 0);
if(lfd == -1)
{
perror("socket");
exit(0);
}

int val = 1;
if(setsockopt(lfd,SOL_SOCKET,SO_REUSEADDR,&val,sizeof(val)) < 0)
{
perror("setsockopt()");
exit(1);
}

// 2. 将socket()返回值和本地的IP端口绑定到一起
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(atoi(SERVERPORT)); // 大端端口
// INADDR_ANY代表本机的所有IP, 假设有三个网卡就有三个IP地址
// 这个宏可以代表任意一个IP地址
// 这个宏一般用于本地的绑定操作
addr.sin_addr.s_addr = INADDR_ANY; // 这个宏的值为0 == 0.0.0.0
// inet_pton(AF_INET, "192.168.237.131", &addr.sin_addr.s_addr);
int ret = bind(lfd, (struct sockaddr*)&addr, sizeof(addr));
if(ret == -1)
{
perror("bind");
exit(0);
}

// 3. 设置监听
ret = listen(lfd, 128);
if(ret == -1)
{
perror("listen");
exit(0);
}

// 4. 阻塞等待并接受客户端连接
struct sockaddr_in cliaddr;
int clilen = sizeof(cliaddr);
int cfd = accept(lfd, (struct sockaddr*)&cliaddr, &clilen);
if(cfd == -1)
{
perror("accept");
exit(0);
}
// 打印客户端的地址信息
char ip[IPSTRSIZE];
inet_ntop(AF_INET, &cliaddr.sin_addr.s_addr, ip, sizeof(ip));
printf("客户端的IP地址: %s, 端口: %d\n",ip,ntohs(cliaddr.sin_port));

// 5. 和客户端通信
int number = 0;
while(1)
{
// 接收数据
char buf[1024];
memset(buf, 0, sizeof(buf));
int len = recv(cfd, buf, sizeof(buf),0);
if(len > 0)
{
printf("客户端say: %s\n", buf);
// 发送数据
memset(buf, 0, sizeof(buf));
sprintf(buf, "客户端信息成功接收...%d\n", number++);
send(cfd, buf, strlen(buf)+1,0);
}
else if(len == 0)
{
printf("客户端断开了连接...\n");
break;
}
else
{
perror("read");
break;
}
}

close(cfd);
close(lfd);

return 0;
}

client.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// client.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>

#define SERVERPORT "1989"

int main(int argc,char *argv[])
{

if(argc < 2)
{
printf("Usage...\n");
exit(1);
}

// 1. 创建通信的套接字
int fd = socket(AF_INET, SOCK_STREAM, 0);
if(fd == -1)
{
perror("socket");
exit(0);
}

// 2. 连接服务器
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(atoi(SERVERPORT)); // 大端端口
inet_pton(AF_INET,argv[1], &addr.sin_addr.s_addr);

int ret = connect(fd, (struct sockaddr*)&addr, sizeof(addr));
if(ret == -1)
{
perror("connect");
exit(0);
}

// 3. 和服务器端通信
int number = 0;
while(1)
{
// 发送数据
char buf[1024];
sprintf(buf, "你好, 服务器...%d\n", number++);
send(fd, buf, strlen(buf)+1,0);

// 接收数据
memset(buf, 0, sizeof(buf));
int len = recv(fd, buf, sizeof(buf),0);
if(len > 0)
{
printf("服务器say: %s\n", buf);
}
else if(len == 0)
{
printf("服务器断开了连接...\n");
break;
}
else
{
perror("recv()");
break;
}
sleep(1); // 每隔1s发送一条数据
}

close(fd);

return 0;
}

编译运行结果:

image-20240129223015845

image-20240129223031909

四、扩展阅读

在Window中也提供了套接字通信的API,这些API函数与Linux平台的API函数几乎相同,以至于很多人认为套接字通信的API函数库只有一套,下面来看一下这些Windows平台的套接字函数:

1.初始化套接字环境

使用Windows中的套接字函数需要额外包含对应的头文件以及加载响应的动态库:

1
2
3
4
// 使用包含的头文件 
include <winsock2.h>
// 使用的套接字库
ws2_32.dll

在Windows中使用套接字需要先加载套接字库(套接字环境),最后需要释放套接字资源

1
2
3
// 初始化Winsock库
// 返回值: 成功返回0,失败返回SOCKET_ERROR。
WSAStartup(WORD wVersionRequested, LPWSADATA lpWSAData);

参数:

  • wVersionRequested: 使用的Windows Socket的版本, 一般使用的版本是 2.2
    • 初始化这个 MAKEWORD(2, 2);参数
  • lpWSAData:一个WSADATA结构指针, 这是一个传入参数
    • 创建一个 WSADATA 类型的变量, 将地址传递给该函数的第二个参数

注销Winsock相关库,函数调用成功返回0,失败返回 SOCKET_ERROR

1
int WSACleanup (void);

使用举例:

1
2
3
4
5
6
7
8
9
WSAData wsa;
// 初始化套接字库
WSAStartup(MAKEWORD(2, 2), &wsa);

// .......


// 注销Winsock相关库
WSACleanup();

2.套接字通信函数

基于Linux的套接字通信流程是最全面的一套通信流程,如果是在某个框架中进行套接字通信,通信流程只会更简单,直接使用window的套接字api进行套接字通信,和Linux平台上的通信流程完全相同。

(1)结构体
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
///////////////////////////////////////////////////////////////////////
/////////////////////////////// Windows ///////////////////////////////
///////////////////////////////////////////////////////////////////////
typedef struct in_addr {
  union {
   struct{ unsigned char s_b1,s_b2, s_b3,s_b4;} S_un_b;
   struct{ unsigned short s_w1, s_w2;} S_un_w;
   unsigned long S_addr; // 存储IP地址
  } S_un;
}IN_ADDR;

struct sockaddr_in {
  short int sin_family; /* Address family */
  unsigned short int sin_port; /* Port number */
  struct in_addr sin_addr; /* Internet address */
  unsigned char sin_zero[8]; /* Same size as struct sockaddr */
};

///////////////////////////////////////////////////////////////////////
//////////////////////////////// Linux ////////////////////////////////
///////////////////////////////////////////////////////////////////////
typedef unsigned short uint16_t;
typedef unsigned int uint32_t;
typedef uint16_t in_port_t;
typedef uint32_t in_addr_t;
typedef unsigned short int sa_family_t;

struct in_addr
{
in_addr_t s_addr;
};

// sizeof(struct sockaddr) == sizeof(struct sockaddr_in)
struct sockaddr_in
{
sa_family_t sin_family; /* 地址族协议: AF_INET */
in_port_t sin_port; /* 端口, 2字节-> 大端 */
struct in_addr sin_addr; /* IP地址, 4字节 -> 大端 */
/* 填充 8字节 */
unsigned char sin_zero[sizeof (struct sockaddr) - sizeof(sin_family) -
sizeof (in_port_t) - sizeof (struct in_addr)];
};
(2)大小端转换函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 主机字节序 -> 网络字节序
u_short htons (u_short hostshort );
u_long htonl ( u_long hostlong);

// 网络字节序 -> 主机字节序
u_short ntohs (u_short netshort );
u_long ntohl ( u_long netlong);

// linux函数, window上没有这两个函数
inet_ntop();
inet_pton();

// windows 和 linux 都使用, 只能处理ipv4的ip地址
// 点分十进制IP -> 大端整形
unsigned long inet_addr (const char FAR * cp); // windows
in_addr_t inet_addr (const char *cp); // linux

// 大端整形 -> 点分十进制IP
// window, linux相同
char* inet_ntoa(struct in_addr in);
(3)套接字函数

window的api中套接字对应的类型是 SOCKET 类型, linux中是 int 类型, 本质是一样的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// 创建一个套接字
// 返回值: 成功返回套接字, 失败返回INVALID_SOCKET
SOCKET socket(int af,int type,int protocal);
参数:
- af: 地址族协议
- ipv4: AF_INET (windows/linux)
- PF_INET (windows)
- AF_INET == PF_INET
- type: 和linux一样
- SOCK_STREAM
- SOCK_DGRAM
- protocal: 一般写0 即可
- 在windows上的另一种写法
- IPPROTO_TCP, 使用指定的流式协议中的tcp协议
- IPPROTO_UDP, 使用指定的报式协议中的udp协议

// 关键字: FAR NEAR, 这两个关键字在32/64位机上是没有意义的, 指定的内存的寻址方式
// 套接字绑定本地IP和端口
// 返回值: 成功返回0,失败返回SOCKET_ERROR
int bind(SOCKET s,const struct sockaddr FAR* name, int namelen);

// 设置监听
// 返回值: 成功返回0,失败返回SOCKET_ERROR
int listen(SOCKET s,int backlog);

// 等待并接受客户端连接
// 返回值: 成功返回用于的套接字,失败返回INVALID_SOCKET。
SOCKET accept ( SOCKET s, struct sockaddr FAR* addr, int FAR* addrlen );

// 连接服务器
// 返回值: 成功返回0,失败返回SOCKET_ERROR
int connect (SOCKET s,const struct sockaddr FAR* name,int namelen );

// 在Qt中connect用户信号槽的连接, 如果要使用windows api 中的 connect 需要在函数名前加::
::connect(sock, (struct sockaddr*)&addr, sizeof(addr));

// 接收数据
// 返回值: 成功时返回接收的字节数,收到EOF时为0,失败时返回SOCKET_ERROR。
// ==0 代表对方已经断开了连接
int recv (SOCKET s,char FAR* buf,int len,int flags);

// 发送数据
// 返回值: 成功返回传输字节数,失败返回SOCKET_ERROR。
int send (SOCKET s,const char FAR * buf, int len,int flags);

// 关闭套接字
// 返回值: 成功返回0,失败返回SOCKET_ERROR
int closesocket (SOCKET s); // 在linux中使用的函数是: int close(int fd);

//----------------------- udp 通信函数 -------------------------
// 接收数据
int recvfrom(SOCKET s,char FAR *buf,int len,int flags,
struct sockaddr FAR *from,int FAR *fromlen);
// 发送数据
int sendto(SOCKET s,const char FAR *buf,int len,int flags,
const struct sockaddr FAR *to,int tolen);

五、服务器并发

在上文的(三)TCP通信流程的第三小节(3)Web Server单线程实例中,是在单线程\单进程的场景下进行的,accept是一个阻塞函数,若accept阻塞住,则无法建立新的连接,因此服务器无法处理多连接。为了实现服务器的并发,有以下几种解决方法:

  • 使用多线程实现(相较于多进程节省系统资源)
  • 使用多进程实现
  • 使用IO多路转接(复用)实现(在单线程的场景下,依旧可以实现服务器的并发,但是效率比多线程低)
  • 使用IO多路转接+多线程实现(在使用IO多路转接实现的基础上,增加上多线程可以增加效率)

1.单线程Socket通信

通信协议

proto.h

1
2
3
4
5
6
7
8
9
10
11
/*
* 通信协议
* */

#ifndef SOCKET_MULTITHREADING_PROTO_H
#define SOCKET_MULTITHREADING_PROTO_H

#define SERVERPORT "1989"
#define SERVERIP "0.0.0.0"

#endif //SOCKET_MULTITHREADING_PROTO_H

客户端:

Client.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
/*
* 客户端程序
* */

#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
#include <arpa/inet.h>
#include <unistd.h>

#include "proto.h"

#define BUFSIZE 1024

int main(int argc,char *argv[])
{
int sd; // 客户端用于通信的套接字
struct sockaddr_in raddr; // 服务端远端ip信息结构体
char BUF[BUFSIZE];
int len;

// 判断终端指令参数个数是否合理
if(argc < 2)
{
fprintf(stderr,"Usage...\n");
exit(1);
}

/*1.创建用于与服务端进行通信的套接字*/
sd = socket(AF_INET,SOCK_STREAM,0);
if(sd < 0)
{
perror("socket()");
exit(1);
}
/*************************************************************/

/*2.设置远端ip结构体信息,客户端向服务端发送连接请求*/
raddr.sin_family = AF_INET;
raddr.sin_port = htons(atoi(SERVERPORT));
inet_pton(AF_INET,argv[1],&raddr.sin_addr);
if(connect(sd,(void *) &raddr,sizeof raddr) < 0)
{
perror("connect()");
exit(1);
}
/*************************************************************/


/*3.建立连接成功,与服务端进行通信*/
int number = 0;
while(1)
{
// 缓冲区进行清0
memset(BUF, 0, sizeof(BUF));
sprintf(BUF, "你好,服务器...%d\n", number++);
// 向服务端发送数据
send(sd, BUF, strlen(BUF)+1,0);

// 接受服务端数据
memset(BUF, 0, sizeof(BUF));
int len = recv(sd,BUF,sizeof BUF,0);
if(len > 0)
{
fprintf(stdout,"服务器say: %s\n", BUF);
}
else if(len == 0)
{
fprintf(stdout,"服务器断开了连接...\n");
break;
}
else
{
perror("recv()");
break;
}
sleep(1); // 每隔1s发送一条数据
}

close(sd); // 关闭套接字
/*************************************************************/

exit(0);
}

服务端:

Server.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
#include <arpa/inet.h>
#include <unistd.h>

#include "proto.h"

#define IPSTRSIZE 40
#define BUFSIZE 1024




int main()
{
int sd; // 用于监听客户端与服务端连接请求
int new_sd; // 用于客户端与服务端进行通信
char ipStr[IPSTRSIZE];
struct sockaddr_in laddr; // 本地服务端地址
struct sockaddr_in raddr; // 远端客户端地址
socklen_t raddr_len;
char BUF[BUFSIZE];
int len;

/*1.获得监听套接字*/
sd = socket(AF_INET,SOCK_STREAM,0);
if(sd < 0)
{
perror("socket()");
exit(1);
}
/*************************************************************/

/*2.设置套接字属*/
// 这个选项使得在同一端口上可以快速重启一个监听服务,而不必等待操作系统关闭前一个服务的套接字
int val = 1;
if(setsockopt(sd,SOL_SOCKET,SO_REUSEADDR,&val,sizeof val) < 0)
{
perror("socket()'");
exit(1);
}
/*************************************************************/

/*3.为服务端绑定本地ip与端口*/
laddr.sin_family = AF_INET;
laddr.sin_port = htons(atoi(SERVERPORT));
inet_pton(AF_INET,SERVERIP,&laddr.sin_addr);
if(bind(sd,(void *)&laddr,sizeof laddr) < 0)
{
perror("bind()");
exit(1);
}
/*************************************************************/

/*4.监听客户端通信连接请求*/
if(listen(sd,200) < 0)
{
perror("listen()");
exit(1);
}

/*5.阻塞等待并接受客户端连接*/
raddr_len = sizeof raddr;
new_sd = accept(sd,(void *)&raddr,&raddr_len);
if(new_sd < 0)
{
perror("accept()'");
exit(1);
}
// 打印客户端信息
inet_ntop(AF_INET,&raddr.sin_addr,ipStr,sizeof ipStr);
printf("客户端的IP地址: %s, 端口: %d\n",ipStr,ntohs(raddr.sin_port));
/*************************************************************/


/*6.与客户端通信*/
int number = 0;
while (1)
{
// 接受数据
memset(BUF, 0, sizeof(BUF));
len = recv(new_sd, BUF, sizeof(BUF),0);
if(len > 0)
{
printf("客户端say: %s\n", BUF);
// 发送数据
memset(BUF, 0, sizeof(BUF));
sprintf(BUF, "信息成功接收...%d\n", number++);
send(new_sd, BUF, strlen(BUF)+1,0);
}
else if(len == 0)
{
printf("客户端断开了连接...\n");
break;
}
else
{
perror("recv()");
break;
}
}
close(sd);
close(new_sd);

return 0;
}

编译运行,先运行服务端程序,若无客户端发送连接请求,此时服务端会在accept函数处进行阻塞:

image-20240502115623172

编译运行客户端程序:

服务端终端显示情况:

image-20240502115754644

客户端终端显示情况:

image-20240502115819341

2.多线程并发

多线程中的线程有两大类:主线程(父线程)和子线程,他们分别要在服务器端处理监听和通信流程。根据多进程的处理思路,就可以这样设计服务端程序

主线程:(主要负责,监听连接请求,并且建立连接)

  • 负责监听,处理客户端的连接请求,也就是在父进程中循环调用accept()函数
  • 创建子线程:建立一个新的连接,就创建一个新的子进程,让这个子进程和对应的客户端通信
  • 回收子线程资源:由于回收需要调用阻塞函数,这样就会影响accept(),直接做线程分离即可

子线程:(主要负责,与客户端进行通信,收发数据)

  • 负责通信,基于主线程建立新连接之后得到的文件描述符,和对应的客户端完成数据的接收和发送
  • 发送数据:send() / write()
  • 接收数据:recv() / read()

在多线程版的服务器端程序中,多个线程共用同一个地址空间,有些数据是共享的,有些数据的独占的,下面来分析一些其中的一些细节:

  • 同一地址空间中的多个线程的栈空间是独占的
  • 多个线程共享全局数据区,堆区,以及内核区的文件描述符等资源,因此需要注意数据覆盖问题,并且在多个线程访问共享资源的时候,还需要进行线程同步。

image-20240130132625215

(1)bzero函数

bzero 函数是在C语言中用于将内存块的内容清零的函数

函数原型

1
2
#include <strings.h>
void bzero(void *s, size_t n);
  • s 是指向要清零的内存块的指针。
  • n 是要清零的字节数。

这个函数将指定的内存区域的前 n 个字节设置为零。它通常用于初始化新分配的内存,以确保它不包含任何垃圾值。

(2)程序实例

多线程TCP服务端程序

server.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>

#define SERVERPORT "1989"
#define IPSTRSIZE 40
#define CHILD_THREADSIZE 128
#define BUFSIZE 1024

// 信息结构体
struct SockInfo
{
int fd; // 通信
pthread_t tid; // 线程ID
struct sockaddr_in addr; // 地址信息
};


// 用于与客户端通信的子线程函数
void *working(void *arg)
{
int number = 0;
struct SockInfo* info = (struct SockInfo *)arg;
while(1)
{
// 接受数据
char buf[BUFSIZE];
// recv函数接受数据,得到成功接受数据的字节数
int ret = recv(info->fd,buf,sizeof(buf),0);
if(ret == 0)
{
printf("客户端关闭连接!\n");
info->fd = -1;
break;
}
else if(ret < 0)
{
perror("recv()");
info->fd = -1;
break;
}
// 成功接受数据

else{
memset(buf, 0, sizeof(buf));
sprintf(buf, "客户端信息成功接收...%d\n", number++);
send(info->fd,buf,strlen(buf)+1,0);
}
}

// 关闭用于通信的套接字文件描述符
close(info->fd);
return NULL;

}


// 存储子线程数据数组
struct SockInfo infos[128];
char ipstr[IPSTRSIZE];


int main()
{
int i,max,len;
int connfd;
// 1. 创建用于监听的套接字
int fd = socket(AF_INET, SOCK_STREAM, 0);
if(fd < 0)
{
perror("socket()");
exit(0);
}


// 设置套接字属性,快速释放被占用的端口
int val = 1;
if(setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,&val,sizeof(val)) < 0)
{
perror("setsockopt()");
exit(1);
}



// 2. 为服务端绑定ip与端口
struct sockaddr_in laddr;
laddr.sin_family = AF_INET;
laddr.sin_port = htons(atoi(SERVERPORT));
// 0.0.0.0 为任意地址
// 将ipv4点分式地址转换为 sin_addr需要uint_32 整型
inet_pton(AF_INET,"0.0.0.0",&laddr.sin_addr);
if(bind(fd,(void *)&laddr,sizeof(laddr)) < 0)
{
perror("bind()");
exit(1);
}


// 3.服务端设置监听,监听客户端发送的连接请求
// 监听队列中设置的可容纳的等待接收连接请求最大数量为100
if(listen(fd,100)<0)
{
perror("listen()");
exit(1);
}

// 初始化子线程数组数据
max = sizeof(infos) / sizeof(infos[0]);

for(i=0;i<max;i++)
{
// 将内存块的内容清零
bzero(&infos[i],sizeof(infos[i]));
// 若该子线程数组该位置没有被子线程占用则为-1
infos[i].fd = -1; // 用于通信的套接字文件描述符
infos[i].tid = -1;
}



// 与服务端连接的客户端ip地址长度
len = sizeof(struct sockaddr_in);


// 4.主线程进行监听
// 监听接受连接之后,创建子线程用于与子线程通信
while(1)
{
// 创建子线程
struct SockInfo* pinfo;
// 寻找没有被占用的子线程数组位置
for(i = 0;i<max;i++)
{
if(infos[i].fd == -1)
{
pinfo = &infos[i];
break;
}
if(i == max-1)
{
// 从后往前寻找,若此时没有找到空闲的子线程数组位置,则主线程会被阻塞在这个for循环中
sleep(1);
i--;
}
}


// 5.阻塞并且等待客户端连接
// 得到一个用于与客户端通信的套接字文件描述符
bzero(&ipstr,sizeof(ipstr));
connfd = accept(fd,(void *)&pinfo->addr,&len);
if(connfd < 0)
{
perror("accept()");
exit(1);
}
// 打印输出 与服务端连接的客户端ip信息
inet_ntop(AF_INET,&pinfo->addr.sin_addr,ipstr,IPSTRSIZE);
printf("主线程,创建的套接字文件描述符:%d, 客户端IP:%s, 占用端口: %d\n",connfd,ipstr,ntohs(pinfo->addr.sin_port));

pinfo->fd = connfd;
pthread_create(&pinfo->tid,NULL,working,pinfo);
// 进行线程分离,创建的子线程资源不需要主线程进行回收
pthread_detach(pinfo->tid);

}


// 关闭用于监听的套接字文件描述符
close(fd);

return 0;

}

整理版

上述版本的代码有些乱,重新整理如下:

Server_Multithread.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
/*
* 多线程服务端
* */
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>

#include "proto.h"

#define IPSTRSIZE 40
#define CHILD_THREADSIZE 128
#define BUFSIZE 1024

// 信息结构体
struct SockInfo
{
int fd; // 通信套接字文件描述符
pthread_t tid; // 线程ID
struct sockaddr_in raddr; // 地址信息
};

// 用于与客户端通信的子线程函数
void Server_job(void *arg)
{
int number = 0;
struct SockInfo* info = (struct SockInfo *)arg;
char ipStr[IPSTRSIZE];
inet_ntop(AF_INET,&info->raddr.sin_addr,ipStr,sizeof ipStr);
while(1)
{
// 接受数据
char buf[BUFSIZE];
// recv函数接受数据,得到成功接受数据的字节数
int ret = recv(info->fd,buf,sizeof(buf),0);
if(ret == 0)
{
printf("客户端关闭连接,IP:%s,PORT:%d\n",ipStr, ntohs(info->raddr.sin_port));
info->fd = -1;
break;
}
else if(ret < 0)
{
perror("recv()");
// 置-1
info->fd = -1;
break;
}
else{
memset(buf, 0, sizeof(buf));
sprintf(buf, "客户端信息成功接收...%d\n", number++);
send(info->fd,buf,strlen(buf)+1,0);
}
}

// 关闭用于通信的套接字
close(info->fd);

// exit(0);
// 子线程退出
pthread_exit(NULL);
}

int main()
{
int i,max,len,sd;
int new_sd;
char ipStr[IPSTRSIZE];
struct SockInfo infos[CHILD_THREADSIZE]; // 存储子线程数据数组
struct sockaddr_in laddr; // 本地服务端地址
// struct sockaddr_in raddr; // 远端客户端地址
socklen_t raddr_len;


/*1.创建用于监听服务端与客户端通信套接字*/
sd = socket(AF_INET,SOCK_STREAM,0);
if(sd < 0)
{
perror("socket()'");
exit(1);
}
/**************************************************/


/*2.设置套接字属*/
// 这个选项使得在同一端口上可以快速重启一个监听服务,而不必等待操作系统关闭前一个服务的套接字
int val = 1;
if(setsockopt(sd,SOL_SOCKET,SO_REUSEADDR,&val,sizeof val) < 0)
{
perror("socket()'");
exit(1);
}
/**************************************************/


/*3.为服务端绑定本地ip与端口*/
laddr.sin_family = AF_INET;
laddr.sin_port = htons(atoi(SERVERPORT));
inet_pton(AF_INET,SERVERIP,&laddr.sin_addr);
if(bind(sd,(void *)&laddr,sizeof laddr) < 0)
{
perror("bind()");
exit(1);
}
/**************************************************/


/*4.服务端设置监听,监听客户端发送的连接请求*/
// 监听队列中设置的可容纳的等待接收连接请求最大数量为100
if(listen(sd,100)<0)
{
perror("listen()");
exit(1);
}
/**************************************************/


/*5.初始化服务端子进程数组数据*/
for(i = 0;i<CHILD_THREADSIZE;i++)
{
// 将每一个内存块内容清0
bzero(&infos[i],sizeof(infos[i]));
// 若该子线程数组该位置没有被子线程占用则为-1
infos[i].fd = -1;
infos[i].tid = -1;
}
/**************************************************/


/*6.接受客户端连接,并且创建子线程用于与客户端通信*/
raddr_len = sizeof (struct sockaddr_in);
while(1)
{
// 创建一个子线程的结构体指针
struct SockInfo *pinfo;
// 寻找没有被占用的子线程数组位置
for(i = 0;i<CHILD_THREADSIZE;i++)
{
if(infos[i].fd == -1)
{
// 创建的结构体指针指向空闲的数组位置
pinfo = &infos[i];
break;
}
if(i == max-1)
{
// 从后往前寻找,若此时没有找到空闲的子线程数组位置,则主线程会被阻塞在这个for循环中
sleep(1);
i--;
}
}

// 阻塞等待客户端连接
// 得到一个用于与客户端通信的套接字文件描述符
bzero(ipStr,sizeof ipStr);
new_sd = accept(sd,(void *)&pinfo->raddr,&raddr_len);
if(new_sd < 0)
{
perror("accept()");
exit(1);
}
// 打印输出 与服务端连接的客户端ip信息
inet_ntop(AF_INET,&pinfo->raddr.sin_addr,ipStr,sizeof ipStr);
printf("主线程,创建的套接字文件描述符:%d, 客户端IP:%s, 占用端口: %d\n",new_sd,ipStr,ntohs(pinfo->raddr.sin_port));

// 创建的用于通信的套接字描述符赋值给pinfo->fd,new_sd,之后重新获得赋给其他子线程,对于这个子线程无影响
pinfo->fd = new_sd;

// 创建子线程用于与客户端进行数据收发
pthread_create(&pinfo->tid,NULL,Server_job,pinfo);

// 进行线程分离,创建的子线程资源不需要主线程进行回收
pthread_detach(pinfo->tid);
}

// 关闭用于监听通信连接的套接字
close(sd);

exit(0);
}

编译运行,先启动服务端程序,在启动多个客户端程序,如下:

服务端终端显示:

image-20240502224825496

客户端1终端显示:

image-20240502224901969

客户端2终端显示:

image-20240502224927556

3.多进程并发

编写多进程并发服务器程序,首先需要考虑的就是,创建的多进程都是什么角色。在Tcp服务器端一共有两个角色,分别是:监听和通信,监听是一个持续的动作,如果有新连接就建立连接,如果没有新连接就阻塞。关于通信是需要和多个客户端同时进行的,因此需要多个进程,这样才能达到互不影响的效果。进程也有两大类:父进程和子进程,通过分析我们可以这样分配进程:

父进程:

  • 负责监听,处理客户端的连接请求,也就是在父进程中循环调用accept()函数
  • 创建子进程:建立一个新的连接,就创建一个新的子进程,让这个子进程和对应的客户端通信
  • 回收子进程资源:子进程退出回收其内核PCB资源,防止出现僵尸进程

子进程:负责通信,基于父进程建立新连接之后得到的文件描述符,和对应的客户端完成数据的接收和发送

  • 发送数据:send() / write()
  • 接收数据:recv() / read()

在多进程版的服务器端程序中,多个进程是有血缘关系,对应有血缘关系的进程来说,还需要想明白他们有哪些资源是可以被继承的,哪些资源是独占的,以及一些其他细节:

  • 子进程是父进程的拷贝,在子进程的内核区PCB中,文件描述符也是可以被拷贝的,因此在父进程可以使用的文件描述符在子进程中也有一份,并且可以使用它们做和父进程一样的事情。
  • 父子进程有用各自的独立的虚拟地址空间,因此所有的资源都是独占的
  • 为了节省系统资源,对于只有在父进程才能用到的资源,可以在子进程中将其释放掉,父进程亦如此
  • 由于需要在父进程中做accept()操作,并且要释放子进程资源,如果想要更高效一下可以使用信号的方式处理

image-20240503102148995

(1)程序实例

在父进程中调用accept函数,阻塞等待客户端连接,若连接成功,则创建子进程用于与客户端进行数据通信。若子进程结束,则向父进程发送SIGCHLD信号,告知父进程,子进程状态发生改变,父进程调用callback函数(信号处理函数),对子进程的资源进行回收处理。

ServerMultiProcess.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <signal.h>
#include <sys/wait.h>
#include <errno.h>

#include "proto.h"

#define IPSTRSIZE 40
#define BUFSIZE 1024

/*信号处理函数*/
void callback(int num)
{
while(1)
{
// 等待子进程退出进行资源回收(收尸)
// 退出成功则会得到退出子进程pid,pid需大于0
// 第一个参数 -1 表示 waitpid() 将等待任何子进程。这相当于 wait() 函数的行为,不限定特定的子进程
// WNOHANG:第三个参数是一个选项集,用于修改 waitpid() 的行为。WNOHANG 是一个非阻塞选项,它告诉 waitpid() 如果没有任何已终止的子进程就立即返回,而不是等待。
pid_t pid = waitpid(-1,NULL,WNOHANG);
if(pid <= 0)
{
printf("子进程正在运行,或者子进程回收完毕");
break;
}
printf("子进程回收完毕,PID = %d\n",pid);
break;
}
}


/*子进程工作函数*/
static void Server_job(int PID,int new_sd,void *arg)
{
int number = 0;
char BUF[BUFSIZE];
char ipStr[IPSTRSIZE];
struct sockaddr_in *raddr = (struct sockaddr_in *)arg;
inet_ntop(AF_INET,&raddr->sin_addr,ipStr,sizeof ipStr);
while(1)
{
memset(BUF, 0, sizeof(BUF));
// 接收数据
int len = recv(new_sd,BUF,sizeof(BUF),0);
if(len == 0)
{
printf("\n");
printf("[%d]:客户端关闭连接,IP:%s,PORT:%d\n",PID,ipStr, ntohs(raddr->sin_port));
break;
}
else if(len < 0)
{
perror("recv()");
break;
}
else
{
memset(BUF, 0, sizeof(BUF));
sprintf(BUF, "客户端信息成功接收...%d\n", number++);
send(new_sd,BUF,strlen(BUF)+1,0);
}
}
}


int main()
{
struct sockaddr_in laddr; // 服务端本机ip
struct sockaddr_in raddr; // 远端客户端ip
socklen_t raddr_len;
int sd,new_sd;
char ipStr[IPSTRSIZE];
pid_t pid;

/*1.获得用于监听的套接字,并且设置套接字属性*/
sd = socket(AF_INET,SOCK_STREAM,0);
if(sd < 0)
{
perror("socket()");
exit(1);
}
// 设置套接字属性,使得套接字连接意外终止时,可以使得在同一端口上可以快速重启一个监听服务
int val = 1;
if(setsockopt(sd,SOL_SOCKET,SO_REUSEADDR,&val,sizeof(val)) < 0)
{
perror("setsockopt()");
exit(1);
}
/*****************************************************************************/

/*2.为服务端本机ip进行绑定,并设置监听客户端连接请求*/
laddr.sin_family = AF_INET;
laddr.sin_port = htons(atoi(SERVERPORT));
inet_pton(AF_INET,SERVERIP,&laddr.sin_addr); // 点分式 转unint_32整型
if(bind(sd,(void *)&laddr,sizeof laddr) < 0)
{
perror("bind()");
exit(1);
}
// 监听客户端连接请求
if(listen(sd,100) < 0)
{
perror("listen()");
exit(1);
}
/*****************************************************************************/




/*3.主进程对于捕捉到SIGCHLD信号的行为进行重新定义*/
struct sigaction act,oAct;
act.sa_flags = 0; // 无特殊要求
act.sa_handler = callback; // 当捕获到该信号时,主进程调用callback函数作出响应
sigemptyset(&act.sa_mask); // 处理该信号时额外需要被阻塞的信号集合,但是程序中将当前集合设置为空集
// 用于修改信号SIGCHLD的行为
// 其中SIGCHLD信号,是一个由操作系统发送给父进程的信号,用来通知父进程其一个或多个子进程的状态发生了改变
// SIGCHLD 用于通知父进程其子进程的状态发生变化。可以用来处理子进程的终止、暂停、恢复等状态。
sigaction(SIGCHLD, &act, &oAct);
/*****************************************************************************/


/*3.主进程阻塞等待连接,并且创建子进程用于与客户端进行通信*/
raddr_len = sizeof raddr;
while(1)
{
do {
// 阻塞等待接受连接
new_sd = accept(sd,(void *)&raddr,&raddr_len);
if(new_sd < 0)
{
if(errno != EINTR)
{
perror("accept()");
exit(1);
}
}
}while(new_sd < 0);

// 对ipStr清0
bzero(ipStr,sizeof ipStr);
inet_ntop(AF_INET,&raddr.sin_addr,ipStr,sizeof ipStr);
// 终端打印输出客户端信息
printf("客户端IP: %s,PORT: %d\n",ipStr, ntohs(raddr.sin_port));


// 创建子进程用于与客户端进行数据通信
fflush(NULL);
pid = fork();
if(pid < 0)
{
perror("fork()");
exit(1);
}

// 子进程中返回
if(pid == 0)
{
// 子进程会将父进程资源进行拷贝,用于监听的套接字也会拷贝
close(sd);
// 子进程服务函数
Server_job(getpid(),new_sd,&raddr);
// 关闭通信的套接字
close(new_sd);
// 退出子进程--会向父进程发送SIGCHLD信号,使得父进程调用callback函数对子进程进行资源回收
exit(0);
}
else
{
close(new_sd);
}
}

/*****************************************************************************/

// 退出
exit(0);
}

编译运行,服务端程序,在逐个运行客户端程序,如下:

服务端终端显示:

image-20240503170435846

客户端1终端显示:

image-20240503170453742

客户端2终端显示:

image-20240503170517863

4.线程池并发

  • 线程池中,与客户端建立连接(创建一个与客户端建立连接的子线程,当有客户端与服务端建立连接之后,再创建其他子线程与客户端进行通信)以及和客户端通信都是由工作(消费者)线程完成的
  • 工作的线程与管理者线程均是子线程
  • 主线程,创建用于监听的套接字并且进行绑定,之后进行连接监听,再进行线程池对象的创建,最后将接受客户端连接的任务放到线程池中,主线程就可以进行退出了

注意:在主线程中,创建线程池需要保持,工作线程(管理者线程与工作线程是分开的)的最少数量为1,因为需要保证,至少用于监听客户端连接的工作子线程一直处于工作状态

1.线程池程序

threadpool.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
//
// Created by zxz on 5/3/24.
//

#ifndef THREADPOOL_THREADPOOL_H
#define THREADPOOL_THREADPOOL_H

#include <pthread.h>

// 在threadpool.c源文件进行定义与初始化,在头文件进行extern
// 若在头文件中进行初始化,当头文件被多个文件进行引用的时候,就会导致比变量被重复定义
extern const int NUMBER;

// 任务结构体
typedef struct Task
{
void (*function)(void *arg); // 函数指针
void *arg;
}Task;

// 线程池结构体
struct Threadpool
{
//任务队列
Task *taskQ;
int queueCapacity; // 容量
int queueSize; // 当前任务个数
int queueFront; // 队头 -> 取数据
int queueRear; // 队尾 -> 放数据

pthread_t managerID; // 管理者线程ID
pthread_t *threadIDs; // 工作的线程ID--使用一级指针指向装有工作线程ID的数组
int minNum; // 最小工作线程数量
int maxNum; // 最大工作线程数量
int busyNum; // 忙的工作线程的个数
int liveNum; // 存活的工作线程的个数
int exitNum; // 要销毁的工作线程个数---适当的销毁没有活干的工作线程
pthread_mutex_t mutexPool; // 锁整个的线程池---对整个任务队列做线程同步,避免造成数据混乱
pthread_mutex_t mutexBusy; // 锁busyNum变量---该变量变化次数更多,也需要避免数据混乱
// 条件变量
pthread_cond_t notFull; // 任务队列是不是满了
pthread_cond_t notEmpty; // 任务队列是不是空了

int shutdown; // 是不是要销毁线程池, 销毁为1, 不销毁为0
};

typedef struct Threadpool Thread_Pool;

//创建线程池并初始化
Thread_Pool *threadPoolCreate(int min,int max,int queueSize);

// 销毁线程池
int threadPoolDestroy(Thread_Pool* pool);

// 给线程池添加任务
void threadPoolAdd(Thread_Pool* pool, void(*func)(void*), void* arg);

// 获取线程池中工作的线程个数
int threadPoolBusyNum(Thread_Pool* pool);

// 获取线程池中活着的线程的个数
int threadPoolAliveNum(Thread_Pool* pool);

// 工作的线程(消费者线程)任务函数
_Noreturn void* worker(void* arg);

// 管理者线程任务函数
void* manager(void* arg);

// 单个线程退出
void threadExit(Thread_Pool* pool);


#endif //THREADPOOL_THREADPOOL_H

threadpool.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include "threadpool.h"

// 管理者线程--每次增加或者销毁的线程数量
const int NUMBER = 1;

/*** 创建线程池 ****/
// 参数 queueSize 任务队列中最多可塞进去任务数量
Thread_Pool *threadPoolCreate(int min,int max,int queueSize)
{
Thread_Pool *pool;
pool = malloc(sizeof(Thread_Pool));
do{
if(pool == NULL)
{
fprintf(stderr,"malloc threadPool fail...\n");
break;
}

// 为工作的线程开辟空间进行信息存储管理
pool->threadIDs = malloc(sizeof(pthread_t) * max);
if(pool->threadIDs == NULL)
{
fprintf(stderr,"malloc threadIDS fail...\n");
break;
}

// 对工作的线程ID进行初始化为0,若之后判断仍然为0,则说明该线程ID没有被占用
memset(pool->threadIDs,0,sizeof(pthread_t)*max);
// 初始化相关变量
pool->minNum = min;
pool->maxNum = max;
pool->busyNum = 0;
pool->liveNum = min;
pool->exitNum = 0;

// 初始化互斥锁与条件变量
if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
pthread_cond_init(&pool->notEmpty, NULL) != 0 ||
pthread_cond_init(&pool->notFull, NULL) != 0)
{
fprintf(stderr,"mutex or condition init fail...\n");
break;
}

// 任务队列
pool->taskQ = malloc(sizeof(Task) * queueSize);
pool->queueCapacity = queueSize;
pool->queueSize = 0;
pool->queueFront = 0;
pool->queueRear = 0;

pool->shutdown = 0;

// 创建管理者线程
pthread_create(&pool->managerID,NULL,manager,pool);
// 输出管理者线程信息
printf("Manager Thread Create,PID: %ld\n",pool->managerID);
// 创建最少数量的工作线程
for(int i = 0;i<min;i++)
{
pthread_create(&pool->threadIDs[i],NULL,worker,pool);
// 线程分离
pthread_detach(pool->threadIDs[i]);
}
return pool;
}while(0); // while(0)不会一直循环,但是在代码块中可以使用break

// 若跳出了while(0),则出现了异常
// 进行资源释放
if (pool && pool->threadIDs) free(pool->threadIDs);
if (pool && pool->taskQ) free(pool->taskQ);
if (pool) free(pool);

return NULL;
}

/*****工作的线程(消费者线程)任务函数*****/
// 在任务队列中取出任务并且进行消费
// 消费者消费产品之后需要唤醒阻塞的生产者
void* worker(void* arg)
{
Thread_Pool *pool = (Thread_Pool *)arg;

while (1)
{
// 从任务队列中取出任务需要开启线程锁
pthread_mutex_lock(&pool->mutexPool);
// 线程池还没有被销毁,且当前任务队列为空时
while(pool->queueSize ==0 && !pool->shutdown)
{
// 阻塞工作线程---此时任务等待生产者线程添加任务再继续
// 当前任务队列为空,消费者线程无法在任务队列中取出任务,应该阻塞等待生产者线程,向任务队列中添加任务
pthread_cond_wait(&pool->notEmpty,&pool->mutexPool);

// pthread_cond_wait线程解除阻塞之后,会先解锁再加锁
// pool->exitNum表示要杀死的空闲子线程数量
if(pool->exitNum > 0)
{
pool->exitNum--;
// 销毁线程有约束,需要满足,存活的线程数不能低于minNum
if(pool->liveNum > pool->minNum)
{
// 活着的线程个数也需要-1 线程在wait函数中解除阻塞时,会自动加锁,因此此处不需要进行加锁操作
pool->liveNum--;
// 解开线程锁---pool-liveNum与pool->exitNum都是共享变量
pthread_mutex_unlock(&pool->mutexPool);
// 线程退出
threadExit(pool);
// pthread_exit(NULL);
}
}
}

// 线程池是否销毁判断
// > 0表示销毁
if(pool->shutdown > 0)
{
// 解锁
pthread_mutex_unlock(&pool->mutexPool);
// 调用worker子线程退出
threadExit(pool);
// pthread_exit(NULL);
}

// 任务队列是共享资源,因此线程在访问时,需要枷锁
// 工作线程开始在任务队列的对首取任务,进行消费
Task task;
// 从对首取任务
task.function = pool->taskQ[pool->queueFront].function;
task.arg = pool->taskQ[pool->queueFront].arg;
// 移动头节点---循环队列
pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;
// 任务总个数-1
pool->queueSize--;
// 唤醒阻塞在 队列满的条件变量(pool->notFull) 上的生产者线程,此时已经消费了一个任务,队列不满了
// 可以继续添加任务了
pthread_cond_signal(&pool->notFull);

// 解锁
pthread_mutex_unlock(&pool->mutexPool);

// 输出告知系统当前线程开始执行任务
printf("thread %ld start working... \n",pthread_self());
// 忙碌的线程数量+1,访问共享资源,枷锁
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum++;
pthread_mutex_unlock(&pool->mutexBusy);

// 执行任务
task.function(task.arg);
// 任务执行结束,释放资源
free(task.arg);
task.arg = NULL; // 避免悬空指针

// 输出告知系统当前线程已经执行完毕任务
printf("thread %ld end working... \n",pthread_self());

// 任务执行完之后,忙线程需要-1,访问共享资源,枷锁
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum--;
pthread_mutex_unlock(&pool->mutexBusy);
}

// return NULL;
}

/***** 管理者线程任务函数 *****/
// 用于创建线程与销毁多余空闲的线程
void *manager(void *arg)
{
Thread_Pool * pool = (Thread_Pool*)arg;
while(!pool->shutdown)
{
// 每隔3s检测一次
sleep(3);

// 取出线程池中任务的数量和当前线程存活的数量
// 需要加锁,避免数据混乱
pthread_mutex_lock(&pool->mutexPool);
int queueSize = pool->queueSize;
int liveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);

// 取出忙的线程数量
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);

/***** 添加线程 ******/
// 当工作线程忙不过来的时,需要继续添加线程
// 任务队列中剩余的任务的个数>(线程存活的个数 - 忙碌的线程数量) && 存活的线程数 < 最大线程数
if(queueSize > liveNum-busyNum && liveNum < pool->maxNum)
{
// 加锁 因为下面也操作了线程池的变量
pthread_mutex_lock(&pool->mutexPool);
int counter = 0;
for(int i = 0;i<pool->maxNum && counter < NUMBER && pool->liveNum < pool->maxNum;i++)
{
// 该线程ID没有被使用
if(pool->threadIDs[i] == 0)
{
// 创建新的工作线程
pthread_create(&pool->threadIDs[i],NULL,worker,pool);
// 进行线程分离
pthread_detach(pool->threadIDs[i]);
counter++;
pool->liveNum++;
}
}
// 解锁
pthread_mutex_unlock(&pool->mutexPool);
}

/***** 销毁线程 ******/
// 忙的线程*2 < 存活的线程数 && 存活的线程>最小线程数
if(busyNum < liveNum && liveNum > pool->minNum)
{
// 操作线程池中的变量需要进行加锁
pthread_mutex_lock(&pool->mutexPool);
pool->exitNum = NUMBER;
pthread_mutex_unlock(&pool->mutexPool);

// 让空闲的工作线程自杀
for(int i=0;i<NUMBER;i++)
{
// 若此时多余的线程是空闲的则说明此时的任务队列为空
// 多余的线程此时阻塞在worker函数的pthread_cond_wait(&pool->notEmpty,&pool->mutexPool)位置
// 只需将其唤醒,使其在worker函数中向后执行并且自杀
pthread_cond_signal(&pool->notEmpty);
}
}
}

printf("Manager Thread,%ld exiting...............\n",pthread_self());
// 管理者线程退出
pthread_exit(NULL);
}


/**** 线程退出函数 ****/
// 存储退出线程id的数组位置需要进行清0
void threadExit(Thread_Pool* pool)
{
// 调用该函数的线程
pthread_t tid = pthread_self();
for(int i = 0;i<pool->maxNum;i++)
{
// 找到当前调用该函数的线程id的数组位置,对其进行归零
if(pool->threadIDs[i] == tid)
{
pool->threadIDs[i] = 0;
printf("threadExit() called,%ld exiting...............\n",tid);
break;
}
}
// 当前线程退出
pthread_exit(NULL);
// 退出的工作线程资源回收---线程回收与退出的线程本身没有关系,他自己无法执行
// pthread_join(tid,NULL);
}


/***** 线程池添加任务 ****/
// 生产者生产产品之后需要唤醒阻塞的消费者
void threadPoolAdd(Thread_Pool* pool, void(*func)(void*), void* arg)
{
// 也需要锁住--避免有线程同时在任务队列中添加任务又在任务队列中取任务
pthread_mutex_lock(&pool->mutexPool);
// 若任务队列满
while(pool->queueSize == pool->queueCapacity && !pool->shutdown)
{
// 阻塞生产者线程
pthread_cond_wait(&pool->notFull,&pool->mutexPool);
}
// 线程池关闭
if(pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPool);
return;
}
// 线程池没有关闭 则继续添加任务 添加到队尾
pool->taskQ[pool->queueRear].function = func;
pool->taskQ[pool->queueRear].arg = arg;
// 队尾进行后移
pool->queueRear =(pool->queueRear + 1) % pool->queueCapacity;
// 任务队列中的任务个数+1
pool->queueSize++;

// 唤醒阻塞在 队列为空pool->notEmpty 条件变量上的线程 有活干了
pthread_cond_signal(&pool->notEmpty);

pthread_mutex_unlock(&pool->mutexPool);
}


/**** 获取线程池中工作的线程个数 ****/
int threadPoolBusyNum(Thread_Pool* pool)
{
pthread_mutex_lock(&pool->mutexPool);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexPool);
return busyNum;
}

/****** 获取线程池中活着的线程的个数 ******/
int threadPoolAliveNum(Thread_Pool* pool)
{
pthread_mutex_lock(&pool->mutexPool);
int liveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);
return liveNum;
}


/****** 销毁线程池 *******/
int threadPoolDestroy(Thread_Pool* pool)
{
// pool线程池指向的是一块非有效的地址
if(pool == NULL)
{
return -1;
}

// 关闭线程池
pool->shutdown = 1;
// 唤醒存活的消费者线程
for(int i = 0;i<pool->liveNum;i++)
{
pthread_cond_signal(&pool->notEmpty);
}

// 阻塞回收管理者线程---等工作线程退出之后在对管理者线程进行资源回收
pthread_join(pool->managerID,NULL);

// 释放堆内存
if(pool->taskQ)
{
free(pool->taskQ);
}
if(pool->threadIDs)
{
free(pool->threadIDs);
}

// 销毁条件变量与互斥锁
pthread_mutex_destroy(&pool->mutexPool);
pthread_mutex_destroy(&pool->mutexBusy);
pthread_cond_destroy(&pool->notEmpty);
pthread_cond_destroy(&pool->notFull);

free(pool);
pool = NULL;
return 0;
}

2.基于线程池的套接字程序

通信协议:

proto.h

1
2
3
4
5
6
7
#ifndef UNTITLED_PROTO_H
#define UNTITLED_PROTO_H

#define SERVERPORT "1989"
#define SERVERIP "0.0.0.0"

#endif //UNTITLED_PROTO_H

客户端程序:

client.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
/*
* 客户端程序
* */

#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
#include <arpa/inet.h>
#include <unistd.h>

#include "proto.h"

#define BUFSIZE 1024


int main(int argc,char *argv[])
{
int sd; // 客户端用于通信的套接字
struct sockaddr_in raddr; // 服务端远端ip信息结构体
char BUF[BUFSIZE];
int len;

// 判断终端指令参数个数是否合理
if(argc < 2)
{
fprintf(stderr,"Usage...\n");
exit(1);
}

/*1.创建用于与服务端进行通信的套接字*/
sd = socket(AF_INET,SOCK_STREAM,0);
if(sd < 0)
{
perror("socket()");
exit(1);
}
/*************************************************************/

/*2.设置远端ip结构体信息,客户端向服务端发送连接请求*/
raddr.sin_family = AF_INET;
raddr.sin_port = htons(atoi(SERVERPORT));
inet_pton(AF_INET,argv[1],&raddr.sin_addr);
if(connect(sd,(void *) &raddr,sizeof raddr) < 0)
{
perror("connect()");
exit(1);
}
/*************************************************************/


/*3.建立连接成功,与服务端进行通信*/
int number = 0;
while(1)
{
// 缓冲区进行清0
memset(BUF, 0, sizeof(BUF));
sprintf(BUF, "你好,服务器...%d\n", number++);
// 向服务端发送数据
send(sd, BUF, strlen(BUF)+1,0);

// 接受服务端数据
memset(BUF, 0, sizeof(BUF));
int len = recv(sd,BUF,sizeof BUF,0);
if(len > 0)
{
fprintf(stdout,"服务器say: %s\n", BUF);
}
else if(len == 0)
{
fprintf(stdout,"服务器断开了连接...\n");
break;
}
else
{
perror("recv()");
break;
}
sleep(1); // 每隔1s发送一条数据
}

close(sd); // 关闭套接字
/*************************************************************/

exit(0);
}

基于线程池的服务端程序:

在服务端的主线程中,进行监听套接字的创建与绑定等操作,随后创建线程池,并且向线程池中添加用于监听套接字连接请求的工作子线程。完成这些工作,主线程就可以退出了,退出了并不会影响子线程的工作

在监听套接字连接请求的工作子线程中,每次接受客户端连接请求后,都会继续创建一个工作子线程用于与客户端进行数据通信

当多个客户端全部中断通信之后,过一段时间线程池就会将用于与客户端通信的子线程资源进行回收,但是管理者线程与用于监听客户端连接的工作子线程会一直进行工作,用于需要管理线程池,与继续监听客户端的连接…

server.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>

#include "proto.h"
#include "threadpool.h"

#define IPSTRSIZE 40
#define BUFSIZE 1024

// 信息结构体
struct SockInfo
{
int fd; // 通信
pthread_t tid; // 线程ID
struct sockaddr_in addr; // 地址信息
};

typedef struct PoolInfo
{
Thread_Pool *p;
int fd;
}PoolInfo;


// 用于与客户端通信的子线程函数
void working(void *arg);
// 用于监听客户端连接的子线程函数
void acceptConn(void *arg);

// 存储子线程数据数组
char ipstr[IPSTRSIZE];


int main()
{
/*1. 创建用于监听的套接字,并且设置套接字属性*/
int fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0)
{
perror("socket()");
exit(1);
}
// 设置套接字属性,快速释放被占用的端口
int val = 1;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) < 0) {
perror("setsockopt()");
exit(1);
}
/******************************************************************/


/*2. 为服务端绑定ip与端口*/
struct sockaddr_in laddr;
laddr.sin_family = AF_INET;
laddr.sin_port = htons(atoi(SERVERPORT));
// 0.0.0.0 为任意地址
// 将ipv4点分式地址转换为 sin_addr需要uint_32 整型
inet_pton(AF_INET, SERVERPORT, &laddr.sin_addr);
if (bind(fd, (void *) &laddr, sizeof(laddr)) < 0)
{
perror("bind()");
exit(1);
}
/******************************************************************/


/*3. 服务端设置监听,监听客户端发送的连接请求*/
// 监听队列中设置的可容纳的等待接收连接请求最大数量为100
if (listen(fd, 100) < 0)
{
perror("listen()");
exit(1);
}
/******************************************************************/


/*4. 创建线程池,并且向线程池中添加用于监听套接字连接的任务*/
// 线程池中最少的工作子线程数量为1个,只要保证监听客户端连接的子线程一直工作保持在监听状态就可以
// 线程池中任务队列的容量为100
Thread_Pool *pool = threadPoolCreate(1,8,100);
// 与客户端建立连接的线程参数信息构建
PoolInfo *info = (void *) malloc(sizeof(PoolInfo));
info->p = pool;
info->fd = fd;
// 向线程池中添加用于监听客户端连接请求的任务
threadPoolAdd(pool,acceptConn,info);
/******************************************************************/

// 主线程任务完成退出
pthread_exit(NULL);

return 0;
}

/*用于监听客户端连接的子线程函数*/
void acceptConn(void *arg)
{
PoolInfo *poolInfo = (void *)arg;
// 与服务端连接的客户端ip地址长度
int len = sizeof(struct sockaddr_in);
// 4.主线程进行监听
// 监听接受连接之后,创建子线程用于与子线程通信
while(1)
{
bzero(&ipstr,sizeof(ipstr));

// 创建子线程
struct SockInfo* pinfo;
pinfo = (void *) malloc(sizeof(struct SockInfo));
int cfd = accept(poolInfo->fd,(void *)&pinfo->addr,&len);
if(cfd < 0)
{
perror("accept()");
exit(1);
}
pinfo->fd = cfd;

// 打印输出 与服务端连接的客户端ip信息
inet_ntop(AF_INET,&pinfo->addr.sin_addr,ipstr,IPSTRSIZE);
printf("连接成功, 客户端IP:%s, 占用端口: %d\n",ipstr,ntohs(pinfo->addr.sin_port));

// 添加通信任务
threadPoolAdd(poolInfo->p, working,pinfo);
}

// 关闭用于监听的套接字文件描述符
close(poolInfo->fd);
}


// 用于与客户端通信的子线程函数
void working(void *arg)
{
int number = 0;
struct SockInfo* info = (struct SockInfo *)arg;
while(1)
{
// 接受数据
char buf[BUFSIZE];
// recv函数接受数据,得到成功接受数据的字节数
int ret = recv(info->fd,buf,sizeof(buf),0);
if(ret == 0)
{
printf("客户端关闭连接!\n");
info->fd = -1;
break;
}
else if(ret < 0)
{
perror("recv()");
info->fd = -1;
break;
}
// 成功接受数据

else{
memset(buf, 0, sizeof(buf));
sprintf(buf, "客户端信息成功接收...%d\n", number++);
send(info->fd,buf,strlen(buf)+1,0);
}
}

// 关闭用于通信的套接字文件描述符
close(info->fd);
}

第一步:终端编译运行基于线程池的服务端程序如下:

1
2
$ gcc server.c threadpool.c -lpthread -o server
$ ./server

image-20240504172233784

管理者线程启动,用于监听套接字连接的工作子线程启动

第二步:编译并且启动多个客户端程序如下./client 0.0.0.0,启动了四个,服务端终端显示如下:

image-20240504172544592

上图可知,有四个工作的子线程进行创建用于与客户端进行通信

第三步:依次关闭,开启的客户端程序,服务端终端显示如下:

image-20240504172741347

上图可知,用于与客户端通信的子线程,全部依次退出