多线程网络通信 本文的内容均转载于“爱编程大丙“博客,有小部分个人理解,本文仅用于记录个人的学习过程
视频链接
文档链接
一、相关概念 1.局域网与广域网
局域网:局域网将一定区域内的各种计算机、外部设备和数据库连接起来形成计算机通信的私有网络。
广域网:又称广域网、外网、公网。是连接不同地区局域网或城域网计算机通信的远程公共网络
IP用于在网络中定位某一台主机,而端口则用于定位主机中的某一个进程
2.IP IP(Internet Protocol):本质是一个整形数,用于表示计算机在网络中的地址。IP协议版本有两个:IPv4和IPv6
3.端口 端口的作用是定位到主机上的某一个进程,通过这个端口进程就可以接受到对应的网络数据了。
一个程序运行起来就是一个进程,若这个进程需要接收对应的数据,那么需要绑定IP与端口
1 2 3 4 5 比如: 在电脑上运行了微信和QQ, 小明通过客户端给我的的微信发消息, 电脑上的微信就收到了消息, 为什么? 运行在电脑上的微信和QQ都绑定了不同的端口 通过IP地址可以定位到某一台主机,通过端口就可以定位到主机上的某一个进程 通过指定的IP和端口,发送数据的时候对端就能接受到数据了
端口也是一个整形数 unsigned short ,一个16位整形数,有效端口的取值范围是:0 ~ 65535(0 ~ 216-1)
提问:计算机中所有的进程都需要关联一个端口吗,一个端口可以被重复使用吗?
4.OSI/ISO网络分层协议 OSI(Open System Interconnect),即开放式系统互联。 一般都叫OSI参考模型,是ISO(国际标准化组织组织)在1985年研究的网络互联模型。
每一层的作用,文档链接 ,视频链接
5.网络协议 网络协议指的是计算机网络中互相通信的对等实体之间交换信息时所必须遵守的规则的集合。一般系统网络协议包括五个部分:通信环境,传输服务,词汇表,信息的编码格式,时序、规则和过程。先来通过下面几幅图了解一下常用的网络协议的格式:
网络协议示意图:文档链接
在网络通信的时候, 程序猿需要负责的应用层数据的处理(最上层)
二、Socket网络编程 1.字节序 字节序,顾名思义字节的顺序,就是大于一个字节类型的数据在内存中的存放顺序,也就是说对于单字符来说是没有字节序问题的,字符串是单字符的集合,因此字符串也没有字节序问题
目前在各种体系的计算机中通常采用的字节存储机制主要有两种:Big-Endian 和 Little-Endian,下面先从字节序说起
相关函数:
BSD Socket提供了封装好的转换接口,方便程序员使用。包括从主机字节序到网络字节序的转换函数:htons
、htonl
;从网络字节序到主机字节序的转换函数:ntohs
、ntohl
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 #include <arpa/inet.h> uint16_t htons (uint16_t hostshort) ; uint32_t htonl (uint32_t hostlong) ; uint16_t ntohs (uint16_t netshort) uint32_t ntohl (uint32_t netlong) ;
2.IP地址转换 IP地址本质是一个整形数,但是在使用的过程中都是通过一个字符串来描述,下面的函数描述了如何将一个字符串类型的IP地址进行大小端转换:
1 2 3 4 int inet_pton (int af, const char *src, void *dst) ;
参数:
af: 地址族(IP地址的家族包括ipv4和ipv6)协议
AF_INET: ipv4格式的ip地址
AF_INET6: ipv6格式的ip地址
src: 传入参数, 对应要转换的点分十进制的ip地址: 192.168.1.100
dst: 传出参数, 函数调用完成, 转换得到的大端整形IP被写入到这块内存中
返回值:
1 2 3 #include <arpa/inet.h> const char *inet_ntop (int af, const void *src, char *dst, socklen_t size) ;
参数:
af: 地址族协议
AF_INET: ipv4格式的ip地址
AF_INET6: ipv6格式的ip地址
src: 传入参数, 这个指针指向的内存中存储了大端的整形IP地址
dst: 传出参数, 存储转换得到的小端的点分十进制的IP地址
size: 修饰dst参数的, 标记dst指向的内存中最多可以存储多少个字节
返回值:
成功: 指针指向第三个参数对应的内存地址, 通过返回值也可以直接取出转换得到的IP字符串
失败: NULL
还有一组函数也能进程IP地址大小端的转换,但是只能处理ipv4的ip地址:
1 2 3 4 5 in_addr_t inet_addr (const char *cp) ;char * inet_ntoa (struct in_addr in) ;
3.sockaddr数据结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 struct sockaddr { sa_family_t sa_family; char sa_data[14 ]; } typedef unsigned short uint16_t ;typedef unsigned int uint32_t ;typedef uint16_t in_port_t ;typedef uint32_t in_addr_t ;typedef unsigned short int sa_family_t ;#define __SOCKADDR_COMMON_SIZE (sizeof (unsigned short int)) struct in_addr { in_addr_t s_addr; }; struct sockaddr_in { sa_family_t sin_family; in_port_t sin_port; struct in_addr sin_addr ; unsigned char sin_zero[sizeof (struct sockaddr) - sizeof (sin_family) - sizeof (in_port_t ) - sizeof (struct in_addr)]; };
4.套接字函数 (1)socket函数 使用套接字通信函数需要包含头文件<arpa/inet.h>
,包含了这个头文件<sys/socket.h>
就不用在包含了
1 2 int socket (int domain, int type, int protocol) ;
参数:
domain: 使用的地址族协议
AF_INET: 使用IPv4格式的ip地址
AF_INET6: 使用IPv4格式的ip地址
type:
SOCK_STREAM: 使用流式的传输协议
SOCK_DGRAM: 使用报式(报文)的传输协议
protocol: 一般写0即可, 使用默认的协议
SOCK_STREAM: 流式传输默认使用的是tcp
SOCK_DGRAM: 报式传输默认使用的udp
返回值:
成功: 可用于套接字通信的文件描述符
失败: -1
函数的返回值是一个文件描述符,通过这个文件描述符可以操作内核中的某一块内存,网络通信是基于这个文件描述符来完成的。
(2)bind函数 1 2 int bind (int sockfd, const struct sockaddr *addr, socklen_t addrlen) ;
参数:
sockfd: 监听的文件描述符, 通过socket()调用得到的返回值
addr: 传入参数, 要绑定的IP和端口信息需要初始化到这个结构体中,IP和端口要转换为网络字节序
addrlen: 参数addr指向的内存大小, sizeof(struct sockaddr)
返回值:
(3)listen函数 1 2 int listen (int sockfd, int backlog) ;
参数:
sockfd: 文件描述符, 可以通过调用socket()得到,在监听之前必须要绑定 bind()
backlog: 同时能处理的最大连接要求,最大值为128
返回值:
(4)accept函数 这个函数是一个阻塞函数,当没有新的客户端连接请求的时候,该函数阻塞;当检测到有新的客户端连接请求时,阻塞解除,新连接就建立了,得到的返回值也是一个文件描述符,基于这个文件描述符就可以和客户端通信了。
该函数是一个线程安全的函数
1 2 int accept (int sockfd, struct sockaddr *addr, socklen_t *addrlen) ;
参数:
sockfd: 监听的文件描述符
addr: 传出参数, 里边存储了建立连接的客户端的地址信息
addrlen: 传入传出参数,用于存储addr指向的内存大小
返回值:
函数调用成功,得到一个文件描述符, 用于和建立连接的这个客户端通信
调用失败返回 -1
(5)read与recv函数 如果连接没有断开,接收端接收不到数据,接收数据的函数会阻塞等待数据到达,数据到达后函数解除阻塞,开始接收数据,当发送端断开连接,接收端无法接收到任何数据,但是这时候就不会阻塞了,函数直接返回0。
1 2 3 ssize_t read (int sockfd, void *buf, size_t size) ;ssize_t recv (int sockfd, void *buf, size_t size, int flags) ;
参数:
sockfd: 用于通信的文件描述符, accept() 函数的返回值
buf: 指向一块有效内存, 用于存储接收是数据
size: 参数buf指向的内存的容量
flags: 特殊的属性, 一般不使用, 指定为 0
返回值:
大于0:实际接收的字节数
等于0:对方断开了连接
-1:接收数据失败了
(6)write与send函数 1 2 3 ssize_t write (int fd, const void *buf, size_t len) ;ssize_t send (int fd, const void *buf, size_t len, int flags) ;
参数:
fd: 通信的文件描述符, accept() 函数的返回值
buf: 传入参数, 要发送的字符串
len: 要发送的字符串的长度
flags: 特殊的属性, 一般不使用, 指定为 0
返回值:
大于0:实际发送的字节数,和参数len是相等的
-1:发送数据失败了
(7)connect函数 1 2 3 int connect (int sockfd, const struct sockaddr *addr, socklen_t addrlen) ;
参数:
sockfd: 通信的文件描述符, 通过调用socket()函数就得到了
addr: 存储了要连接的服务器端的地址信息: iP 和 端口,这个IP和端口也需要转换为大端然后再赋值
addrlen: addr指针指向的内存的大小 sizeof(struct sockaddr)
返回值:
三、TCP通信流程 TCP是一个面向连接的,安全的,流式传输协议,这个协议是一个传输层协议
面向连接:是一个双向连接,通过三次握手完成,断开连接需要通过四次挥手完成
安全:tcp通信过程中,会对发送的每一数据包都会进行校验, 如果发现数据丢失, 会自动重传
流式传输:发送端和接收端处理数据的速度,数据的量都可以不一致
1.服务端通信流程
1.创建用于监听的套接字, 这个套接字是一个文件描述符
3.设置监听(成功之后开始监听, 监听的是客户端的连接)
4.等待并接受客户端的连接请求, 建立新的连接, 会得到一个新的文件描述符(通信的),没有新连接请求就阻塞,accept是一个阻塞的系统调用
1 2 3 4 read(); / recv(); write(); / send();
在tcp的服务器端, 有两类文件描述符
监听的文件描述符
只需要有一个
不负责和客户端通信, 负责检测客户端的连接请求, 检测到之后调用accept就可以建立新的连接
通信的文件描述符
负责和建立连接的客户端通信
如果有N个客户端和服务器建立了新的连接, 通信的文件描述符就有N个,每个客户端和服务器都对应一个通信的文件描述符
(1)文件描述符的重要性
上图
左方为服务端情形,在服务端只有一个用于监听的文件描述符,而通信的文件描述符可以有N(若干个) ;
左方位客户端情形,在客户端只有一个用于通信的文件描述符 。
通过上述的文件描述符就可以进行网络IO的操作,即可以进行网络数据的读写操作。网络IO读写操作,操作的主体是内核中的内存。而文件IO操作的主体是磁盘中的内存。
进行套接字通信时,每一个文件描述符对应的是内核中的两块内存:读缓冲区以及写缓冲区 。
网络通信文件描述符对应的内存结构 :
一个文件的文件描述符对应两块内存, 一块内存是读缓冲区, 一块内存是写缓冲区
读数据: 通过文件描述符将内存中的数据读出, 这块内存称之为读缓冲区
写数据: 通过文件描述符将数据写入到某块内存中, 这块内存称之为写缓冲区
监听的文件描述符 :
客户端的连接请求会发送到服务器端监听的文件描述符的读缓冲区中
读缓冲区中有数据, 说明有新的客户端连接
调用accept()函数, 这个函数会检测监听文件描述符的读缓冲区
检测不到数据, 该函数阻塞
如果检测到数据, 解除阻塞, 新的连接建立
通信的文件描述符 :
客户端和服务器端都有通信的文件描述符
发送数据:调用函数 write() / send(),数据进入到内核中
数据并没有被发送出去, 而是将数据写入到了通信的文件描述符对应的写缓冲区中
内核检测到通信的文件描述符写缓冲区中有数据, 内核会将数据发送到网络中
接收数据: 调用的函数 read() / recv(), 从内核读数据
数据如何进入到内核程序猿不需要处理, 数据进入到通信的文件描述符的读缓冲区中
数据进入到内核, 必须使用通信的文件描述符, 将数据从读缓冲区中读出即可
2.客户端的通信流程 主动请求连接的一方,不需要对IP以及端口进行绑定,因为connect
函数在向服务端发送连接时,会将本机的ip
以及本机随机一个空闲的端口(系统自动分配)发送给服务端。
在单线程的情况下客户端通信的文件描述符有一个, 没有监听的文件描述符
1 2 3 4 read(); / recv(); write(); / send();
3.Web Server单线程实例 server.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <arpa/inet.h> #define SERVERPORT "1989" #define IPSTRSIZE 40 int main () { int lfd = socket(AF_INET, SOCK_STREAM, 0 ); if (lfd == -1 ) { perror("socket" ); exit (0 ); } int val = 1 ; if (setsockopt(lfd,SOL_SOCKET,SO_REUSEADDR,&val,sizeof (val)) < 0 ) { perror("setsockopt()" ); exit (1 ); } struct sockaddr_in addr ; addr.sin_family = AF_INET; addr.sin_port = htons(atoi(SERVERPORT)); addr.sin_addr.s_addr = INADDR_ANY; int ret = bind(lfd, (struct sockaddr*)&addr, sizeof (addr)); if (ret == -1 ) { perror("bind" ); exit (0 ); } ret = listen(lfd, 128 ); if (ret == -1 ) { perror("listen" ); exit (0 ); } struct sockaddr_in cliaddr ; int clilen = sizeof (cliaddr); int cfd = accept(lfd, (struct sockaddr*)&cliaddr, &clilen); if (cfd == -1 ) { perror("accept" ); exit (0 ); } char ip[IPSTRSIZE]; inet_ntop(AF_INET, &cliaddr.sin_addr.s_addr, ip, sizeof (ip)); printf ("客户端的IP地址: %s, 端口: %d\n" ,ip,ntohs(cliaddr.sin_port)); int number = 0 ; while (1 ) { char buf[1024 ]; memset (buf, 0 , sizeof (buf)); int len = recv(cfd, buf, sizeof (buf),0 ); if (len > 0 ) { printf ("客户端say: %s\n" , buf); memset (buf, 0 , sizeof (buf)); sprintf (buf, "客户端信息成功接收...%d\n" , number++); send(cfd, buf, strlen (buf)+1 ,0 ); } else if (len == 0 ) { printf ("客户端断开了连接...\n" ); break ; } else { perror("read" ); break ; } } close(cfd); close(lfd); return 0 ; }
client.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <arpa/inet.h> #define SERVERPORT "1989" int main (int argc,char *argv[]) { if (argc < 2 ) { printf ("Usage...\n" ); exit (1 ); } int fd = socket(AF_INET, SOCK_STREAM, 0 ); if (fd == -1 ) { perror("socket" ); exit (0 ); } struct sockaddr_in addr ; addr.sin_family = AF_INET; addr.sin_port = htons(atoi(SERVERPORT)); inet_pton(AF_INET,argv[1 ], &addr.sin_addr.s_addr); int ret = connect(fd, (struct sockaddr*)&addr, sizeof (addr)); if (ret == -1 ) { perror("connect" ); exit (0 ); } int number = 0 ; while (1 ) { char buf[1024 ]; sprintf (buf, "你好, 服务器...%d\n" , number++); send(fd, buf, strlen (buf)+1 ,0 ); memset (buf, 0 , sizeof (buf)); int len = recv(fd, buf, sizeof (buf),0 ); if (len > 0 ) { printf ("服务器say: %s\n" , buf); } else if (len == 0 ) { printf ("服务器断开了连接...\n" ); break ; } else { perror("recv()" ); break ; } sleep(1 ); } close(fd); return 0 ; }
编译运行结果:
四、扩展阅读 在Window中也提供了套接字通信的API,这些API函数与Linux平台的API函数几乎相同,以至于很多人认为套接字通信的API函数库只有一套,下面来看一下这些Windows平台的套接字函数:
1.初始化套接字环境 使用Windows中的套接字函数需要额外包含对应的头文件以及加载响应的动态库:
1 2 3 4 include <winsock2.h> ws2_32.dll
在Windows中使用套接字需要先加载套接字库(套接字环境),最后需要释放套接字资源
1 2 3 WSAStartup(WORD wVersionRequested, LPWSADATA lpWSAData);
参数:
wVersionRequested: 使用的Windows Socket的版本, 一般使用的版本是 2.2
lpWSAData:一个WSADATA结构指针, 这是一个传入参数
创建一个 WSADATA 类型的变量, 将地址传递给该函数的第二个参数
注销Winsock相关库,函数调用成功返回0,失败返回 SOCKET_ERROR
使用举例:
1 2 3 4 5 6 7 8 9 WSAData wsa; WSAStartup(MAKEWORD(2 , 2 ), &wsa); WSACleanup();
2.套接字通信函数 基于Linux的套接字通信流程是最全面的一套通信流程,如果是在某个框架中进行套接字通信,通信流程只会更简单,直接使用window的套接字api进行套接字通信,和Linux平台上的通信流程完全相同。
(1)结构体 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 typedef struct in_addr { union { struct { unsigned char s_b1,s_b2, s_b3,s_b4;} S_un_b; struct { unsigned short s_w1, s_w2;} S_un_w; unsigned long S_addr; } S_un; }IN_ADDR; struct sockaddr_in { short int sin_family; unsigned short int sin_port; struct in_addr sin_addr ; unsigned char sin_zero[8 ]; }; typedef unsigned short uint16_t ;typedef unsigned int uint32_t ;typedef uint16_t in_port_t ;typedef uint32_t in_addr_t ;typedef unsigned short int sa_family_t ;struct in_addr { in_addr_t s_addr; }; struct sockaddr_in { sa_family_t sin_family; in_port_t sin_port; struct in_addr sin_addr ; unsigned char sin_zero[sizeof (struct sockaddr) - sizeof (sin_family) - sizeof (in_port_t ) - sizeof (struct in_addr)]; };
(2)大小端转换函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 u_short htons (u_short hostshort ) ; u_long htonl ( u_long hostlong) ; u_short ntohs (u_short netshort ) ; u_long ntohl ( u_long netlong) ; inet_ntop(); inet_pton(); unsigned long inet_addr (const char FAR * cp) ; in_addr_t inet_addr (const char *cp) ; char * inet_ntoa (struct in_addr in) ;
(3)套接字函数 window的api中套接字对应的类型是 SOCKET 类型, linux中是 int 类型, 本质是一样的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 SOCKET socket (int af,int type,int protocal) ; 参数: - af: 地址族协议 - ipv4: AF_INET (windows/linux) - PF_INET (windows) - AF_INET == PF_INET - type: 和linux一样 - SOCK_STREAM - SOCK_DGRAM - protocal: 一般写0 即可 - 在windows上的另一种写法 - IPPROTO_TCP, 使用指定的流式协议中的tcp协议 - IPPROTO_UDP, 使用指定的报式协议中的udp协议 int bind(SOCKET s,const struct sockaddr FAR* name, int namelen);int listen (SOCKET s,int backlog) ;SOCKET accept ( SOCKET s, struct sockaddr FAR* addr, int FAR* addrlen ) ; int connect (SOCKET s,const struct sockaddr FAR* name,int namelen ) ;::connect(sock, (struct sockaddr*)&addr, sizeof (addr)); int recv (SOCKET s,char FAR* buf,int len,int flags) ;int send (SOCKET s,const char FAR * buf, int len,int flags) ;int closesocket (SOCKET s) ; int recvfrom (SOCKET s,char FAR *buf,int len,int flags, struct sockaddr FAR *from,int FAR *fromlen) ;int sendto (SOCKET s,const char FAR *buf,int len,int flags, const struct sockaddr FAR *to,int tolen) ;
五、服务器并发 在上文的(三)
TCP通信流程的第三小节(3)Web Server单线程实例
中,是在单线程\单进程的场景下进行的,accept是一个阻塞函数,若accept阻塞住,则无法建立新的连接,因此服务器无法处理多连接。为了实现服务器的并发,有以下几种解决方法:
使用多线程实现(相较于多进程节省系统资源)
使用多进程实现
使用IO多路转接(复用)实现(在单线程的场景下,依旧可以实现服务器的并发,但是效率比多线程低)
使用IO多路转接+多线程实现(在使用IO多路转接实现的基础上,增加上多线程可以增加效率)
1.单线程Socket通信 通信协议 :
proto.h
1 2 3 4 5 6 7 8 9 10 11 #ifndef SOCKET_MULTITHREADING_PROTO_H #define SOCKET_MULTITHREADING_PROTO_H #define SERVERPORT "1989" #define SERVERIP "0.0.0.0" #endif
客户端 :
Client.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 #include <stdio.h> #include <stdlib.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <string.h> #include <arpa/inet.h> #include <unistd.h> #include "proto.h" #define BUFSIZE 1024 int main (int argc,char *argv[]) { int sd; struct sockaddr_in raddr ; char BUF[BUFSIZE]; int len; if (argc < 2 ) { fprintf (stderr ,"Usage...\n" ); exit (1 ); } sd = socket(AF_INET,SOCK_STREAM,0 ); if (sd < 0 ) { perror("socket()" ); exit (1 ); } raddr.sin_family = AF_INET; raddr.sin_port = htons(atoi(SERVERPORT)); inet_pton(AF_INET,argv[1 ],&raddr.sin_addr); if (connect(sd,(void *) &raddr,sizeof raddr) < 0 ) { perror("connect()" ); exit (1 ); } int number = 0 ; while (1 ) { memset (BUF, 0 , sizeof (BUF)); sprintf (BUF, "你好,服务器...%d\n" , number++); send(sd, BUF, strlen (BUF)+1 ,0 ); memset (BUF, 0 , sizeof (BUF)); int len = recv(sd,BUF,sizeof BUF,0 ); if (len > 0 ) { fprintf (stdout ,"服务器say: %s\n" , BUF); } else if (len == 0 ) { fprintf (stdout ,"服务器断开了连接...\n" ); break ; } else { perror("recv()" ); break ; } sleep(1 ); } close(sd); exit (0 ); }
服务端 :
Server.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 #include <stdio.h> #include <stdlib.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <string.h> #include <arpa/inet.h> #include <unistd.h> #include "proto.h" #define IPSTRSIZE 40 #define BUFSIZE 1024 int main () { int sd; int new_sd; char ipStr[IPSTRSIZE]; struct sockaddr_in laddr ; struct sockaddr_in raddr ; socklen_t raddr_len; char BUF[BUFSIZE]; int len; sd = socket(AF_INET,SOCK_STREAM,0 ); if (sd < 0 ) { perror("socket()" ); exit (1 ); } int val = 1 ; if (setsockopt(sd,SOL_SOCKET,SO_REUSEADDR,&val,sizeof val) < 0 ) { perror("socket()'" ); exit (1 ); } laddr.sin_family = AF_INET; laddr.sin_port = htons(atoi(SERVERPORT)); inet_pton(AF_INET,SERVERIP,&laddr.sin_addr); if (bind(sd,(void *)&laddr,sizeof laddr) < 0 ) { perror("bind()" ); exit (1 ); } if (listen(sd,200 ) < 0 ) { perror("listen()" ); exit (1 ); } raddr_len = sizeof raddr; new_sd = accept(sd,(void *)&raddr,&raddr_len); if (new_sd < 0 ) { perror("accept()'" ); exit (1 ); } inet_ntop(AF_INET,&raddr.sin_addr,ipStr,sizeof ipStr); printf ("客户端的IP地址: %s, 端口: %d\n" ,ipStr,ntohs(raddr.sin_port)); int number = 0 ; while (1 ) { memset (BUF, 0 , sizeof (BUF)); len = recv(new_sd, BUF, sizeof (BUF),0 ); if (len > 0 ) { printf ("客户端say: %s\n" , BUF); memset (BUF, 0 , sizeof (BUF)); sprintf (BUF, "信息成功接收...%d\n" , number++); send(new_sd, BUF, strlen (BUF)+1 ,0 ); } else if (len == 0 ) { printf ("客户端断开了连接...\n" ); break ; } else { perror("recv()" ); break ; } } close(sd); close(new_sd); return 0 ; }
编译运行,先运行服务端程序,若无客户端发送连接请求,此时服务端会在accept函数处进行阻塞:
编译运行客户端程序:
服务端终端显示情况:
客户端终端显示情况:
2.多线程并发 多线程中的线程有两大类:主线程(父线程)和子线程,他们分别要在服务器端处理监听和通信流程。根据多进程的处理思路,就可以这样设计服务端程序 :
主线程 :(主要负责,监听连接请求,并且建立连接)
负责监听,处理客户端的连接请求,也就是在父进程中循环调用accept()函数
创建子线程:建立一个新的连接,就创建一个新的子进程,让这个子进程和对应的客户端通信
回收子线程资源:由于回收需要调用阻塞函数,这样就会影响accept(),直接做线程分离即可
子线程 :(主要负责,与客户端进行通信,收发数据)
负责通信,基于主线程建立新连接之后得到的文件描述符,和对应的客户端完成数据的接收和发送
发送数据:send() / write()
接收数据:recv() / read()
在多线程版的服务器端程序中,多个线程共用同一个地址空间,有些数据是共享的,有些数据的独占的,下面来分析一些其中的一些细节:
同一地址空间中的多个线程的栈空间是独占的
多个线程共享全局数据区,堆区,以及内核区的文件描述符等资源,因此需要注意数据覆盖问题,并且在多个线程访问共享资源的时候,还需要进行线程同步。
(1)bzero函数 bzero
函数是在C语言中用于将内存块的内容清零的函数
函数原型 :
1 2 #include <strings.h> void bzero (void *s, size_t n) ;
s
是指向要清零的内存块的指针。
n
是要清零的字节数。
这个函数将指定的内存区域的前 n
个字节设置为零。它通常用于初始化新分配的内存,以确保它不包含任何垃圾值。
(2)程序实例 多线程TCP服务端程序 :
server.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <arpa/inet.h> #include <pthread.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #define SERVERPORT "1989" #define IPSTRSIZE 40 #define CHILD_THREADSIZE 128 #define BUFSIZE 1024 struct SockInfo { int fd; pthread_t tid; struct sockaddr_in addr ; }; void *working (void *arg) { int number = 0 ; struct SockInfo * info = (struct SockInfo *)arg; while (1 ) { char buf[BUFSIZE]; int ret = recv(info->fd,buf,sizeof (buf),0 ); if (ret == 0 ) { printf ("客户端关闭连接!\n" ); info->fd = -1 ; break ; } else if (ret < 0 ) { perror("recv()" ); info->fd = -1 ; break ; } else { memset (buf, 0 , sizeof (buf)); sprintf (buf, "客户端信息成功接收...%d\n" , number++); send(info->fd,buf,strlen (buf)+1 ,0 ); } } close(info->fd); return NULL ; } struct SockInfo infos [128];char ipstr[IPSTRSIZE];int main () { int i,max,len; int connfd; int fd = socket(AF_INET, SOCK_STREAM, 0 ); if (fd < 0 ) { perror("socket()" ); exit (0 ); } int val = 1 ; if (setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,&val,sizeof (val)) < 0 ) { perror("setsockopt()" ); exit (1 ); } struct sockaddr_in laddr ; laddr.sin_family = AF_INET; laddr.sin_port = htons(atoi(SERVERPORT)); inet_pton(AF_INET,"0.0.0.0" ,&laddr.sin_addr); if (bind(fd,(void *)&laddr,sizeof (laddr)) < 0 ) { perror("bind()" ); exit (1 ); } if (listen(fd,100 )<0 ) { perror("listen()" ); exit (1 ); } max = sizeof (infos) / sizeof (infos[0 ]); for (i=0 ;i<max;i++) { bzero(&infos[i],sizeof (infos[i])); infos[i].fd = -1 ; infos[i].tid = -1 ; } len = sizeof (struct sockaddr_in); while (1 ) { struct SockInfo * pinfo ; for (i = 0 ;i<max;i++) { if (infos[i].fd == -1 ) { pinfo = &infos[i]; break ; } if (i == max-1 ) { sleep(1 ); i--; } } bzero(&ipstr,sizeof (ipstr)); connfd = accept(fd,(void *)&pinfo->addr,&len); if (connfd < 0 ) { perror("accept()" ); exit (1 ); } inet_ntop(AF_INET,&pinfo->addr.sin_addr,ipstr,IPSTRSIZE); printf ("主线程,创建的套接字文件描述符:%d, 客户端IP:%s, 占用端口: %d\n" ,connfd,ipstr,ntohs(pinfo->addr.sin_port)); pinfo->fd = connfd; pthread_create(&pinfo->tid,NULL ,working,pinfo); pthread_detach(pinfo->tid); } close(fd); return 0 ; }
整理版 :
上述版本的代码有些乱,重新整理如下:
Server_Multithread.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <arpa/inet.h> #include <pthread.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include "proto.h" #define IPSTRSIZE 40 #define CHILD_THREADSIZE 128 #define BUFSIZE 1024 struct SockInfo { int fd; pthread_t tid; struct sockaddr_in raddr ; }; void Server_job (void *arg) { int number = 0 ; struct SockInfo * info = (struct SockInfo *)arg; char ipStr[IPSTRSIZE]; inet_ntop(AF_INET,&info->raddr.sin_addr,ipStr,sizeof ipStr); while (1 ) { char buf[BUFSIZE]; int ret = recv(info->fd,buf,sizeof (buf),0 ); if (ret == 0 ) { printf ("客户端关闭连接,IP:%s,PORT:%d\n" ,ipStr, ntohs(info->raddr.sin_port)); info->fd = -1 ; break ; } else if (ret < 0 ) { perror("recv()" ); info->fd = -1 ; break ; } else { memset (buf, 0 , sizeof (buf)); sprintf (buf, "客户端信息成功接收...%d\n" , number++); send(info->fd,buf,strlen (buf)+1 ,0 ); } } close(info->fd); pthread_exit(NULL ); } int main () { int i,max,len,sd; int new_sd; char ipStr[IPSTRSIZE]; struct SockInfo infos [CHILD_THREADSIZE ]; struct sockaddr_in laddr ; socklen_t raddr_len; sd = socket(AF_INET,SOCK_STREAM,0 ); if (sd < 0 ) { perror("socket()'" ); exit (1 ); } int val = 1 ; if (setsockopt(sd,SOL_SOCKET,SO_REUSEADDR,&val,sizeof val) < 0 ) { perror("socket()'" ); exit (1 ); } laddr.sin_family = AF_INET; laddr.sin_port = htons(atoi(SERVERPORT)); inet_pton(AF_INET,SERVERIP,&laddr.sin_addr); if (bind(sd,(void *)&laddr,sizeof laddr) < 0 ) { perror("bind()" ); exit (1 ); } if (listen(sd,100 )<0 ) { perror("listen()" ); exit (1 ); } for (i = 0 ;i<CHILD_THREADSIZE;i++) { bzero(&infos[i],sizeof (infos[i])); infos[i].fd = -1 ; infos[i].tid = -1 ; } raddr_len = sizeof (struct sockaddr_in); while (1 ) { struct SockInfo *pinfo ; for (i = 0 ;i<CHILD_THREADSIZE;i++) { if (infos[i].fd == -1 ) { pinfo = &infos[i]; break ; } if (i == max-1 ) { sleep(1 ); i--; } } bzero(ipStr,sizeof ipStr); new_sd = accept(sd,(void *)&pinfo->raddr,&raddr_len); if (new_sd < 0 ) { perror("accept()" ); exit (1 ); } inet_ntop(AF_INET,&pinfo->raddr.sin_addr,ipStr,sizeof ipStr); printf ("主线程,创建的套接字文件描述符:%d, 客户端IP:%s, 占用端口: %d\n" ,new_sd,ipStr,ntohs(pinfo->raddr.sin_port)); pinfo->fd = new_sd; pthread_create(&pinfo->tid,NULL ,Server_job,pinfo); pthread_detach(pinfo->tid); } close(sd); exit (0 ); }
编译运行,先启动服务端程序,在启动多个客户端程序,如下:
服务端终端显示:
客户端1终端显示:
客户端2终端显示:
3.多进程并发 编写多进程并发服务器程序,首先需要考虑的就是,创建的多进程都是什么角色。在Tcp服务器端一共有两个角色,分别是:监听和通信,监听是一个持续的动作,如果有新连接就建立连接,如果没有新连接就阻塞。关于通信是需要和多个客户端同时进行的,因此需要多个进程,这样才能达到互不影响的效果。进程也有两大类:父进程和子进程,通过分析我们可以这样分配进程:
父进程:
负责监听,处理客户端的连接请求,也就是在父进程中循环调用accept()函数
创建子进程:建立一个新的连接,就创建一个新的子进程,让这个子进程和对应的客户端通信
回收子进程资源:子进程退出回收其内核PCB资源,防止出现僵尸进程
子进程:负责通信,基于父进程建立新连接之后得到的文件描述符,和对应的客户端完成数据的接收和发送
发送数据:send() / write()
接收数据:recv() / read()
在多进程版的服务器端程序中,多个进程是有血缘关系,对应有血缘关系的进程来说,还需要想明白他们有哪些资源是可以被继承的,哪些资源是独占的,以及一些其他细节:
子进程是父进程的拷贝,在子进程的内核区PCB中,文件描述符也是可以被拷贝的,因此在父进程可以使用的文件描述符在子进程中也有一份,并且可以使用它们做和父进程一样的事情。
父子进程有用各自的独立的虚拟地址空间,因此所有的资源都是独占的
为了节省系统资源,对于只有在父进程才能用到的资源,可以在子进程中将其释放掉,父进程亦如此
由于需要在父进程中做accept()操作,并且要释放子进程资源,如果想要更高效一下可以使用信号的方式处理
(1)程序实例 在父进程中调用accept函数,阻塞等待客户端连接,若连接成功,则创建子进程用于与客户端进行数据通信。若子进程结束,则向父进程发送SIGCHLD
信号,告知父进程,子进程状态发生改变,父进程调用callback函数(信号处理函数),对子进程的资源进行回收处理。
ServerMultiProcess.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <arpa/inet.h> #include <signal.h> #include <sys/wait.h> #include <errno.h> #include "proto.h" #define IPSTRSIZE 40 #define BUFSIZE 1024 void callback (int num) { while (1 ) { pid_t pid = waitpid(-1 ,NULL ,WNOHANG); if (pid <= 0 ) { printf ("子进程正在运行,或者子进程回收完毕" ); break ; } printf ("子进程回收完毕,PID = %d\n" ,pid); break ; } } static void Server_job (int PID,int new_sd,void *arg) { int number = 0 ; char BUF[BUFSIZE]; char ipStr[IPSTRSIZE]; struct sockaddr_in *raddr = (struct sockaddr_in *)arg; inet_ntop(AF_INET,&raddr->sin_addr,ipStr,sizeof ipStr); while (1 ) { memset (BUF, 0 , sizeof (BUF)); int len = recv(new_sd,BUF,sizeof (BUF),0 ); if (len == 0 ) { printf ("\n" ); printf ("[%d]:客户端关闭连接,IP:%s,PORT:%d\n" ,PID,ipStr, ntohs(raddr->sin_port)); break ; } else if (len < 0 ) { perror("recv()" ); break ; } else { memset (BUF, 0 , sizeof (BUF)); sprintf (BUF, "客户端信息成功接收...%d\n" , number++); send(new_sd,BUF,strlen (BUF)+1 ,0 ); } } } int main () { struct sockaddr_in laddr ; struct sockaddr_in raddr ; socklen_t raddr_len; int sd,new_sd; char ipStr[IPSTRSIZE]; pid_t pid; sd = socket(AF_INET,SOCK_STREAM,0 ); if (sd < 0 ) { perror("socket()" ); exit (1 ); } int val = 1 ; if (setsockopt(sd,SOL_SOCKET,SO_REUSEADDR,&val,sizeof (val)) < 0 ) { perror("setsockopt()" ); exit (1 ); } laddr.sin_family = AF_INET; laddr.sin_port = htons(atoi(SERVERPORT)); inet_pton(AF_INET,SERVERIP,&laddr.sin_addr); if (bind(sd,(void *)&laddr,sizeof laddr) < 0 ) { perror("bind()" ); exit (1 ); } if (listen(sd,100 ) < 0 ) { perror("listen()" ); exit (1 ); } struct sigaction act ,oAct ; act.sa_flags = 0 ; act.sa_handler = callback; sigemptyset(&act.sa_mask); sigaction(SIGCHLD, &act, &oAct); raddr_len = sizeof raddr; while (1 ) { do { new_sd = accept(sd,(void *)&raddr,&raddr_len); if (new_sd < 0 ) { if (errno != EINTR) { perror("accept()" ); exit (1 ); } } }while (new_sd < 0 ); bzero(ipStr,sizeof ipStr); inet_ntop(AF_INET,&raddr.sin_addr,ipStr,sizeof ipStr); printf ("客户端IP: %s,PORT: %d\n" ,ipStr, ntohs(raddr.sin_port)); fflush(NULL ); pid = fork(); if (pid < 0 ) { perror("fork()" ); exit (1 ); } if (pid == 0 ) { close(sd); Server_job(getpid(),new_sd,&raddr); close(new_sd); exit (0 ); } else { close(new_sd); } } exit (0 ); }
编译运行,服务端程序,在逐个运行客户端程序,如下:
服务端终端显示:
客户端1终端显示:
客户端2终端显示:
4.线程池并发
线程池中,与客户端建立连接(创建一个与客户端建立连接的子线程,当有客户端与服务端建立连接之后,再创建其他子线程与客户端进行通信)以及和客户端通信都是由工作(消费者)线程完成的
工作的线程与管理者线程均是子线程
主线程,创建用于监听的套接字并且进行绑定,之后进行连接监听,再进行线程池对象的创建,最后将接受客户端连接的任务放到线程池中,主线程就可以进行退出了
注意 :在主线程中,创建线程池需要保持,工作线程(管理者线程与工作线程是分开的)的最少数量为1,因为需要保证,至少用于监听客户端连接的工作子线程一直处于工作状态
1.线程池程序 threadpool.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 #ifndef THREADPOOL_THREADPOOL_H #define THREADPOOL_THREADPOOL_H #include <pthread.h> extern const int NUMBER;typedef struct Task { void (*function)(void *arg); void *arg; }Task; struct Threadpool { Task *taskQ; int queueCapacity; int queueSize; int queueFront; int queueRear; pthread_t managerID; pthread_t *threadIDs; int minNum; int maxNum; int busyNum; int liveNum; int exitNum; pthread_mutex_t mutexPool; pthread_mutex_t mutexBusy; pthread_cond_t notFull; pthread_cond_t notEmpty; int shutdown; }; typedef struct Threadpool Thread_Pool ;Thread_Pool *threadPoolCreate (int min,int max,int queueSize) ; int threadPoolDestroy (Thread_Pool* pool) ;void threadPoolAdd (Thread_Pool* pool, void (*func)(void *), void * arg) ;int threadPoolBusyNum (Thread_Pool* pool) ;int threadPoolAliveNum (Thread_Pool* pool) ;_Noreturn void * worker (void * arg) ;void * manager (void * arg) ;void threadExit (Thread_Pool* pool) ;#endif
threadpool.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 #include <stdlib.h> #include <stdio.h> #include <string.h> #include <unistd.h> #include "threadpool.h" const int NUMBER = 1 ;Thread_Pool *threadPoolCreate (int min,int max,int queueSize) { Thread_Pool *pool; pool = malloc (sizeof (Thread_Pool)); do { if (pool == NULL ) { fprintf (stderr ,"malloc threadPool fail...\n" ); break ; } pool->threadIDs = malloc (sizeof (pthread_t ) * max); if (pool->threadIDs == NULL ) { fprintf (stderr ,"malloc threadIDS fail...\n" ); break ; } memset (pool->threadIDs,0 ,sizeof (pthread_t )*max); pool->minNum = min; pool->maxNum = max; pool->busyNum = 0 ; pool->liveNum = min; pool->exitNum = 0 ; if (pthread_mutex_init(&pool->mutexPool, NULL ) != 0 || pthread_mutex_init(&pool->mutexBusy, NULL ) != 0 || pthread_cond_init(&pool->notEmpty, NULL ) != 0 || pthread_cond_init(&pool->notFull, NULL ) != 0 ) { fprintf (stderr ,"mutex or condition init fail...\n" ); break ; } pool->taskQ = malloc (sizeof (Task) * queueSize); pool->queueCapacity = queueSize; pool->queueSize = 0 ; pool->queueFront = 0 ; pool->queueRear = 0 ; pool->shutdown = 0 ; pthread_create(&pool->managerID,NULL ,manager,pool); printf ("Manager Thread Create,PID: %ld\n" ,pool->managerID); for (int i = 0 ;i<min;i++) { pthread_create(&pool->threadIDs[i],NULL ,worker,pool); pthread_detach(pool->threadIDs[i]); } return pool; }while (0 ); if (pool && pool->threadIDs) free (pool->threadIDs); if (pool && pool->taskQ) free (pool->taskQ); if (pool) free (pool); return NULL ; } void * worker (void * arg) { Thread_Pool *pool = (Thread_Pool *)arg; while (1 ) { pthread_mutex_lock(&pool->mutexPool); while (pool->queueSize ==0 && !pool->shutdown) { pthread_cond_wait(&pool->notEmpty,&pool->mutexPool); if (pool->exitNum > 0 ) { pool->exitNum--; if (pool->liveNum > pool->minNum) { pool->liveNum--; pthread_mutex_unlock(&pool->mutexPool); threadExit(pool); } } } if (pool->shutdown > 0 ) { pthread_mutex_unlock(&pool->mutexPool); threadExit(pool); } Task task; task.function = pool->taskQ[pool->queueFront].function; task.arg = pool->taskQ[pool->queueFront].arg; pool->queueFront = (pool->queueFront + 1 ) % pool->queueCapacity; pool->queueSize--; pthread_cond_signal(&pool->notFull); pthread_mutex_unlock(&pool->mutexPool); printf ("thread %ld start working... \n" ,pthread_self()); pthread_mutex_lock(&pool->mutexBusy); pool->busyNum++; pthread_mutex_unlock(&pool->mutexBusy); task.function(task.arg); free (task.arg); task.arg = NULL ; printf ("thread %ld end working... \n" ,pthread_self()); pthread_mutex_lock(&pool->mutexBusy); pool->busyNum--; pthread_mutex_unlock(&pool->mutexBusy); } } void *manager (void *arg) { Thread_Pool * pool = (Thread_Pool*)arg; while (!pool->shutdown) { sleep(3 ); pthread_mutex_lock(&pool->mutexPool); int queueSize = pool->queueSize; int liveNum = pool->liveNum; pthread_mutex_unlock(&pool->mutexPool); pthread_mutex_lock(&pool->mutexBusy); int busyNum = pool->busyNum; pthread_mutex_unlock(&pool->mutexBusy); if (queueSize > liveNum-busyNum && liveNum < pool->maxNum) { pthread_mutex_lock(&pool->mutexPool); int counter = 0 ; for (int i = 0 ;i<pool->maxNum && counter < NUMBER && pool->liveNum < pool->maxNum;i++) { if (pool->threadIDs[i] == 0 ) { pthread_create(&pool->threadIDs[i],NULL ,worker,pool); pthread_detach(pool->threadIDs[i]); counter++; pool->liveNum++; } } pthread_mutex_unlock(&pool->mutexPool); } if (busyNum < liveNum && liveNum > pool->minNum) { pthread_mutex_lock(&pool->mutexPool); pool->exitNum = NUMBER; pthread_mutex_unlock(&pool->mutexPool); for (int i=0 ;i<NUMBER;i++) { pthread_cond_signal(&pool->notEmpty); } } } printf ("Manager Thread,%ld exiting...............\n" ,pthread_self()); pthread_exit(NULL ); } void threadExit (Thread_Pool* pool) { pthread_t tid = pthread_self(); for (int i = 0 ;i<pool->maxNum;i++) { if (pool->threadIDs[i] == tid) { pool->threadIDs[i] = 0 ; printf ("threadExit() called,%ld exiting...............\n" ,tid); break ; } } pthread_exit(NULL ); } void threadPoolAdd (Thread_Pool* pool, void (*func)(void *), void * arg) { pthread_mutex_lock(&pool->mutexPool); while (pool->queueSize == pool->queueCapacity && !pool->shutdown) { pthread_cond_wait(&pool->notFull,&pool->mutexPool); } if (pool->shutdown) { pthread_mutex_unlock(&pool->mutexPool); return ; } pool->taskQ[pool->queueRear].function = func; pool->taskQ[pool->queueRear].arg = arg; pool->queueRear =(pool->queueRear + 1 ) % pool->queueCapacity; pool->queueSize++; pthread_cond_signal(&pool->notEmpty); pthread_mutex_unlock(&pool->mutexPool); } int threadPoolBusyNum (Thread_Pool* pool) { pthread_mutex_lock(&pool->mutexPool); int busyNum = pool->busyNum; pthread_mutex_unlock(&pool->mutexPool); return busyNum; } int threadPoolAliveNum (Thread_Pool* pool) { pthread_mutex_lock(&pool->mutexPool); int liveNum = pool->liveNum; pthread_mutex_unlock(&pool->mutexPool); return liveNum; } int threadPoolDestroy (Thread_Pool* pool) { if (pool == NULL ) { return -1 ; } pool->shutdown = 1 ; for (int i = 0 ;i<pool->liveNum;i++) { pthread_cond_signal(&pool->notEmpty); } pthread_join(pool->managerID,NULL ); if (pool->taskQ) { free (pool->taskQ); } if (pool->threadIDs) { free (pool->threadIDs); } pthread_mutex_destroy(&pool->mutexPool); pthread_mutex_destroy(&pool->mutexBusy); pthread_cond_destroy(&pool->notEmpty); pthread_cond_destroy(&pool->notFull); free (pool); pool = NULL ; return 0 ; }
2.基于线程池的套接字程序 通信协议:
proto.h
1 2 3 4 5 6 7 #ifndef UNTITLED_PROTO_H #define UNTITLED_PROTO_H #define SERVERPORT "1989" #define SERVERIP "0.0.0.0" #endif
客户端程序:
client.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 #include <stdio.h> #include <stdlib.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <string.h> #include <arpa/inet.h> #include <unistd.h> #include "proto.h" #define BUFSIZE 1024 int main (int argc,char *argv[]) { int sd; struct sockaddr_in raddr ; char BUF[BUFSIZE]; int len; if (argc < 2 ) { fprintf (stderr ,"Usage...\n" ); exit (1 ); } sd = socket(AF_INET,SOCK_STREAM,0 ); if (sd < 0 ) { perror("socket()" ); exit (1 ); } raddr.sin_family = AF_INET; raddr.sin_port = htons(atoi(SERVERPORT)); inet_pton(AF_INET,argv[1 ],&raddr.sin_addr); if (connect(sd,(void *) &raddr,sizeof raddr) < 0 ) { perror("connect()" ); exit (1 ); } int number = 0 ; while (1 ) { memset (BUF, 0 , sizeof (BUF)); sprintf (BUF, "你好,服务器...%d\n" , number++); send(sd, BUF, strlen (BUF)+1 ,0 ); memset (BUF, 0 , sizeof (BUF)); int len = recv(sd,BUF,sizeof BUF,0 ); if (len > 0 ) { fprintf (stdout ,"服务器say: %s\n" , BUF); } else if (len == 0 ) { fprintf (stdout ,"服务器断开了连接...\n" ); break ; } else { perror("recv()" ); break ; } sleep(1 ); } close(sd); exit (0 ); }
基于线程池的服务端程序:
在服务端的主线程中,进行监听套接字的创建与绑定等操作,随后创建线程池,并且向线程池中添加用于监听套接字连接请求的工作子线程。完成这些工作,主线程就可以退出了,退出了并不会影响子线程的工作
在监听套接字连接请求的工作子线程中,每次接受客户端连接请求后,都会继续创建一个工作子线程用于与客户端进行数据通信
当多个客户端全部中断通信之后,过一段时间线程池就会将用于与客户端通信的子线程资源进行回收,但是管理者线程与用于监听客户端连接的工作子线程会一直进行工作,用于需要管理线程池,与继续监听客户端的连接…
server.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <arpa/inet.h> #include <pthread.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include "proto.h" #include "threadpool.h" #define IPSTRSIZE 40 #define BUFSIZE 1024 struct SockInfo { int fd; pthread_t tid; struct sockaddr_in addr ; }; typedef struct PoolInfo { Thread_Pool *p; int fd; }PoolInfo; void working (void *arg) ;void acceptConn (void *arg) ;char ipstr[IPSTRSIZE];int main () { int fd = socket(AF_INET, SOCK_STREAM, 0 ); if (fd < 0 ) { perror("socket()" ); exit (1 ); } int val = 1 ; if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof (val)) < 0 ) { perror("setsockopt()" ); exit (1 ); } struct sockaddr_in laddr ; laddr.sin_family = AF_INET; laddr.sin_port = htons(atoi(SERVERPORT)); inet_pton(AF_INET, SERVERPORT, &laddr.sin_addr); if (bind(fd, (void *) &laddr, sizeof (laddr)) < 0 ) { perror("bind()" ); exit (1 ); } if (listen(fd, 100 ) < 0 ) { perror("listen()" ); exit (1 ); } Thread_Pool *pool = threadPoolCreate(1 ,8 ,100 ); PoolInfo *info = (void *) malloc (sizeof (PoolInfo)); info->p = pool; info->fd = fd; threadPoolAdd(pool,acceptConn,info); pthread_exit(NULL ); return 0 ; } void acceptConn (void *arg) { PoolInfo *poolInfo = (void *)arg; int len = sizeof (struct sockaddr_in); while (1 ) { bzero(&ipstr,sizeof (ipstr)); struct SockInfo * pinfo ; pinfo = (void *) malloc (sizeof (struct SockInfo)); int cfd = accept(poolInfo->fd,(void *)&pinfo->addr,&len); if (cfd < 0 ) { perror("accept()" ); exit (1 ); } pinfo->fd = cfd; inet_ntop(AF_INET,&pinfo->addr.sin_addr,ipstr,IPSTRSIZE); printf ("连接成功, 客户端IP:%s, 占用端口: %d\n" ,ipstr,ntohs(pinfo->addr.sin_port)); threadPoolAdd(poolInfo->p, working,pinfo); } close(poolInfo->fd); } void working (void *arg) { int number = 0 ; struct SockInfo * info = (struct SockInfo *)arg; while (1 ) { char buf[BUFSIZE]; int ret = recv(info->fd,buf,sizeof (buf),0 ); if (ret == 0 ) { printf ("客户端关闭连接!\n" ); info->fd = -1 ; break ; } else if (ret < 0 ) { perror("recv()" ); info->fd = -1 ; break ; } else { memset (buf, 0 , sizeof (buf)); sprintf (buf, "客户端信息成功接收...%d\n" , number++); send(info->fd,buf,strlen (buf)+1 ,0 ); } } close(info->fd); }
第一步 :终端编译运行基于线程池的服务端程序如下:
1 2 $ gcc server.c threadpool.c -lpthread -o server $ ./server
管理者线程启动,用于监听套接字连接的工作子线程启动
第二步 :编译并且启动多个客户端程序如下./client 0.0.0.0
,启动了四个,服务端终端显示如下:
上图可知,有四个工作的子线程进行创建用于与客户端进行通信
第三步 :依次关闭,开启的客户端程序,服务端终端显示如下:
上图可知,用于与客户端通信的子线程,全部依次退出