IPV4流媒体项目(网络多播)-代码解析

一、IPV4项目项目代码以及解析

项目整体框架:

image-20240513230416496

可以做流量控制的地方:

  • 服务端通过socket向多播组ip发送数据,需要考虑丢包,拥塞等问题
  • 客户端调用解码器播放音频数据
  • 客户端,使用管道实现父子进程通信,管道有阻塞的属性


1.协议

site_type.h

该头文件进行数据创建一个别名,将数据类型进行隐藏

1
2
3
4
5
6
7
8
9
10
11
12
13
/*
* 数据类型定义
* */
#ifndef IPV4_STREAMING_MEDIA_SITE_TYPE_H
#define IPV4_STREAMING_MEDIA_SITE_TYPE_H

// stdint 头文件提供了固定宽度的整数类型
#include <stdint.h>

typedef uint8_t chnid_t; // chnid 的范围是0-255 8位2进制的范围


#endif //IPV4_STREAMING_MEDIA_SITE_TYPE_H

proto.h

通信协议中,定义各类宏:

  • DEFAULT_MGROUP:多播组的地址
  • DEFAULT_RCVPORT:接收端口,客户端服务端的端口需要一致
  • MSG_CHANNEL_MAX:UDP通信(频道)数据包推荐大小
  • MSG_LIST_MAX: UDP通信(节目单)数据包推荐大小

定义各类通信数据包的结构体(不适用结构体数据对齐):

  • msg_channel_st: 定义有关具体频道的数据包 (包含 频道id + 描述信息)
  • msg_listentry_st: 定义节目单中每一条记录的数据包 (包含 频道id + 描述信息 + 前面两个部分占用的字节数量(这一条记录占用的大小))
  • msg_list_st: 定义用于发送节目单的数据包 (包含 节目单的频道id + 节目单中多条记录的数据包)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/*
* 通信协议
* */
#ifndef IPV4_STREAMING_MEDIA_PROTO_H
#define IPV4_STREAMING_MEDIA_PROTO_H

#include "site_type.h"

// 数据发送到组播地址,然后被分发到加入改组的成员
#define DEFAULT_MGROUP "224.2.2.2" // 定义多播组的地址
#define DEFAULT_RCVPORT "1989" // 接收端口
#define CHNNR 100 // 频道数量
#define LISTCHNID 0
#define MINCHNID 1
#define MAXCHNID (MINCHNID + CHNNR - 1)

// 推荐包的长度-IP报的长度-UDP报的长度
#define MSG_CHANNEL_MAX (65536-20-8) // msg_channel_st具体频道数据包的最大长度
// 数据包的最大长度 - 频道id报头的大小
#define MSG_DATA (MSG_CHANNEL_MAX - sizeof(chnid_t)) // 具体频道数据包中数据(msg_channel_st中的data)的最大长度

// 推荐包的长度-IP报的长度-UDP报的长度
#define MSG_LIST_MAX (65536 - 20 -8) // msg_list_st数据包的最大长度
// 数据包的最大长度 - 频道id报头的大小
#define MAX_ENTRY (MSG_LIST_MAX - sizeof(chnid_t)) // msg_list_st数据包中数据(msg_list_st中的entry)的最大长度

// 定义有关具体频道的数据包
// GUN C 中 __attribute__((packed)) 取消数据对齐
struct msg_channel_st{
chnid_t chnid; // 频道id,介于[MINCHNID,MAXCHNID]
uint8_t data[1]; // 数据包的长度,变长的数组
}__attribute__((packed));


/*
节目单形式:
1 music : xxxx (xxx代表具体的名称)
2 sport : xxxx
3 xxxxx : xxxx
....
*/

// 节目单中每一条记录的数据包
// 需要包含节目号+描述 (1 music : xxxx)
struct msg_listentry_st{
chnid_t chnid;
uint16_t len; // 这条记录数据的长度
uint8_t desc[1]; // 变长数组,柔性数组成员
}__attribute__((packed));

// 定义msg_list数据包
// 用于发送的节目单数据包
struct msg_list_st{
chnid_t chnid; // 频道id,因为是msg_list只有选择LIST_CHNID才会获取得到节目单
struct msg_listentry_st entry[1]; // 变长数组,柔性数组成员
}__attribute__((packed));

#endif //IPV4_STREAMING_MEDIA_PROTO_H


2.客户端

客户端主要的工作任务如下:

  • 加入多播组,从多播ip接收数据包,数据包含节目单数据以及各个频道的内容数据(MP3文件内容)
  • 接收节目单数据,选择频道
  • 接收选择频道的数据包,通过管道机制,将数据发送给子进程
  • 子进程在管道读端,读取接收的选择频道数据包,并且调用解码器,解析播放接收的mp3文件

客户端程序的工作流程:

  • 命令行分析:通过getopt_long函数获得,运行客户端程序时,命令行中所携带的参数,对参数进行分析,运行不同的命令(指定接收端口、指定多播组、指定播放器、显示帮助,这些不同的命令均有默认的参数)
  • 创建UDP套接字,设置套接字属性,加入多播组
  • 绑定客户端本机的ip信息:在这个项目中,客户端用于接收来自多播组的数据包,属于(先接收的一方)被动端,需要绑定本机的ip与端口(端口需要与服务端的端口一致
  • 创建管道用于父子进程通信
  • 创建父子进程,区分父子进程工作:
    • 父进程:接收节目单数据,选择频道,接收选择的频道数据,将接收的频道数据通过管道发送给子进程
    • 子进程:子进程调用解码器,解析播放从管道读取的频道数据(mp3文件数据)

image-20240513112933275


(1)客户端头文件

client.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#ifndef IPV4_STREAMING_MEDIA_CLIENT_H
#define IPV4_STREAMING_MEDIA_CLIENT_H

/*
* /usr/bin/mpg123:这是一个常见的命令行音频播放器,用于播放MP3文件。它通常安装在Unix和Linux系统的 /usr/bin 目录下
* -:在命令行工具中,单独的短横线 (-) 通常表示该工具应从标准输入(stdin)读取数据,而不是从文件读取。在这里,它意味着 mpg123 将从它的标准输入读取音频数据
* >:这是一个重定向操作符,用于将命令的输出从默认的输出目标(通常是屏幕)重定向到另一个位置
* /dev/null:这是一个特殊的文件,通常被称为“黑洞”。向 /dev/null 写入的数据都会被丢弃,读取 /dev/null 时通常会立即得到文件结束符(EOF)。这里使用它是为了忽略 mpg123 的所有输出,即把所有输出都丢弃
* */
#define DEFAULT_PLAYERCMD "/usr/bin/mpg123 - > /dev/null" // 输出重定向到/dev/null
#define DEFAULT_IF_NAME "ens33" // 定义一个网络接口设备的宏

/*客户端命令行指令结构体*/
struct client_conf_st
{
char *rcvport;
char *mgroup;
char *player_cmd; // 播放器指令
};

extern struct client_conf_st client_conf;

#endif //IPV4_STREAMING_MEDIA_CLIENT_H

解析1:

1
#define DEFAULT_PLAYERCMD "/usr/bin/mpg123 - > /dev/null"  // 输出重定向到/dev/null

/usr/bin/mpg123表示调用一个解码器,其中 - 表示解码器的输入为标准输入,>代表输出重定向,操作指令/usr/bin/mpg123的输出全部重定向到空设备/dev/null中。/dev/null又称之为黑洞,任何发送到 /dev/null 的数据都会被丢弃,读取 /dev/null 时它会立即返回文件结束(EOF)。这使得 /dev/null 成为处理不需要的输出的理想工具。


(2)客户端源码

client.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <net/if.h>
#include <fcntl.h>
#include <wait.h>
#include <errno.h>
#include <getopt.h>

#include "proto.h"
#include "client.h"


/*
* 命令行指令结构体字段初始化
* 为客户端指定默认的 多播组ip 接收端口 播放器(解码器)
* */
struct client_conf_st client_conf = {
.rcvport = DEFAULT_RCVPORT,
.mgroup = DEFAULT_MGROUP,
.player_cmd = DEFAULT_PLAYERCMD
};


/*
* 短格式:-M 长格式:--mgroup 指定多播组
* 短格式:-P 长格式:--port 指定接接收端口
* 短格式:-p 长格式:--player 指定播放器
* 短格式:-H 长格式:--help 显示帮助
*/
/*客户端帮助目录*/
static void printhelp(void)
{
printf("-P --port 指定接收端口\n");
printf("-M --mgroup 指定多播组\n");
printf("-p --player 指定播放器\n");
printf("-H --help 显示帮助\n");
}


/*
* func:
* 从缓冲区buf中,向文件描述符fd,坚持写够len个字节的数据
* return:
* 从buf中写入到fd的字节数
* */
static int writen(int fd, const char *buf, size_t len)
{
int pos = 0;
int ret;
while(len > 0)
{
ret = write(fd, buf+pos, len);
if(ret < 0)
{
// 假错,则继续重新写
if(errno == EINTR)
continue;
// 真错,返回-1
perror("write()");
return -1;
}
len -= ret;
pos += ret;
}
return pos;
}


int main(int argc,char *argv[])
{

// 初始化级别:
// 默认值 < 配置文件 < 环境变量 < 命令行参数
int sd;
pid_t pid;
int len;
int chosenid; // 选择频道id
int ret;

struct sockaddr_in serveraddr,raddr;
socklen_t serveraddr_len,raddr_len;

/*1. 使用getopt_long函数,解析命令行选项*/
int c;
int index = 0;
struct option argarr[] = {
{"port",1,NULL,'P'},
{"mgroup",1,NULL,'M'},
{"player",1,NULL,'p'},
{"help",0,NULL,'H'},
{NULL,0,NULL,0}
};
while(1)
{
// extern char *optarg 是与getopt_long有关的全局变量,会自动指向命令行当前带参数选项,后面的参数
c = getopt_long(argc,argv,"P:M:p:H",argarr,&index);
if(c < 0)
break;
switch (c)
{
case 'P':
client_conf.rcvport = optarg;
break;
case 'M':
client_conf.mgroup = optarg;
break;
case 'p':
client_conf.player_cmd = optarg;
break;
case 'H':
printhelp();
exit(0);
break;
default:
abort();
break;
}
}
/*********************************************************************************/


/*2. 创建报式套接字UDP,并且设置套接字属性*/
sd = socket(AF_INET,SOCK_DGRAM,IPPROTO_UDP);
if (sd < 0)
{
perror("socket()");
exit(1);
}
// 套接字属性-加入多播组--成为多播组的成员
struct ip_mreqn mreq;
// 客户端向加入的多播组的IP地址
inet_pton(AF_INET, client_conf.mgroup, &mreq.imr_multiaddr);
// 设置本地网络接口的IP地址,用于接收多播消息
inet_pton(AF_INET, "0.0.0.0", &mreq.imr_address);
// 指定网络接口,指定多播数据由该网络接口(网卡)接收
mreq.imr_ifindex = if_nametoindex(DEFAULT_IF_NAME);
if (setsockopt(sd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0)
{
perror("setockopt()");
exit(1);
}
// 套接字属性--允许多播数据被本地回环接口接收
int val = 1;
if (setsockopt(sd, IPPROTO_IP, IP_MULTICAST_LOOP, &val, sizeof(val)) < 0)
{
perror("setockopt()");
exit(1);
}
/*********************************************************************************/


/*3. 绑定本机IP与端口*/
struct sockaddr_in laddr;
laddr.sin_family = AF_INET;
// 端口号需要与服务端一致
laddr.sin_port = htons(atoi(client_conf.rcvport));
inet_pton(AF_INET, "0.0.0.0", &laddr.sin_addr);
if (bind(sd, (void *)&laddr, sizeof(laddr)) < 0)
{
perror("bind()");
exit(1);
}
/*********************************************************************************/


/*4. 创建管道用于父子进程通信*/
int pd[2]; // 读写的文件描述符数组,0用于读取,1用于写入
if (pipe(pd) < 0)
{
perror("pipe()");
exit(1);
}
/*********************************************************************************/


/*5. 创建父子进程,区分父子进程的工作*/
pid = fork();
if (pid < 0)
{
perror("fork()");
exit(1);
}
// 子进程-调解码器
if (pid == 0)
{
// 子进程调管道的读端,读取数据,调用解码器进行播放
close(sd); // 子进程会将父进程资源复制,关闭sd
close(pd[1]); // 关闭写端

// 0号描述符表示标准输入
// 现在从标准输入读取的数据都是从管道读端读取的数据
dup2(pd[0], 0); // 将stdin(标准输入)关闭,重定向到管道读端
if (pd[0] > 0) // 如果 pd[0]的文件描述符不为0,则关闭原始的pd[0] 文件描述符
// 因为pd[0]已经重定向到0上,保留原始的pd[0]可能会存在资源泄露
close(pd[0]);

// 通过execl函数,启动一个新的shell进程,并通过这个shell执行 client_conf.player_cmd 中定义的命令
// 通过shell 指令,执行命令 client_conf.player_cmd
// 例如 sh -c 'ls -l' 就是在当前目录下通过启动一个新的shell进程执行命令 ls -l
if (execl("/bin/sh", "sh", "-c", client_conf.player_cmd, NULL) < 0)
{
perror("execl()");
exit(1);
}
}
// 父进程-从网络上收包,发送给子进程(写管道)
else
{
// 收节目单
close(pd[0]); //关闭读端
struct msg_list_st *msg_list; // 节目单结构体指针
msg_list = malloc(MSG_LIST_MAX);
if (msg_list == NULL)
{
perror("malloc()");
exit(1);
}
serveraddr_len = sizeof(struct sockaddr_in); // 此句关键,如何不设置,会出现服务端地址与接收端地址不匹配错误
// 接收数据
while (1)
{
// 接收的数据,放到msg_list缓冲区中,缓冲区大小为MSG_LIST_MAX
// serveraddr 存放发送数据机器的ip信息
len = recvfrom(sd, msg_list, MSG_LIST_MAX, 0, (void *)&serveraddr, &serveraddr_len);
// 数据量不够,出错
if (len < sizeof(struct msg_list_st))
{
fprintf(stderr, "message is too small.\n");
continue;
}
// 比对数据包中的频道id,节目单的频道id为LISTCHNID
if (msg_list->chnid != LISTCHNID)
{
fprintf(stderr, "received program list chnid %d is not match.\n", msg_list->chnid);
continue;
}
break;
}

// 打印节目单,选择频道
// 创建节目单中每一条记录的数据结构体指针
struct msg_listentry_st *pos;
// (char *)pos < (((char *)msg_list) + len) 中的len为上面recvfrom接收的节目单整体数据长度
for(pos = msg_list->entry;(char *)pos < (((char *)msg_list) + len);pos = (void *)(((char *)pos) + ntohs(pos->len)))
{
printf("channel %d : %s\n", pos->chnid, pos->desc); // 打印这条节目的数据
}
free(msg_list);

// 选择频道
// 用户做的选择无需上传到server端,因为server端是将所有频道的数据都进行发送,用户在这些数据中挑选中自己需要的数据
puts("Please choose a channel:");

do{
ret = scanf("%d", &chosenid);
}while(ret < 1);

fprintf(stdout, "chosen channel = %d\n", chosenid);

// 收频道包,发送给子进程
struct msg_channel_st *msg_channel; // 创建用于接收指定频道数据的结构体
msg_channel = malloc(MSG_CHANNEL_MAX);
if (msg_channel == NULL)
{
perror("malloc()");
exit(1);
}
len = 0;
raddr_len = sizeof(struct sockaddr_in);
while (1)
{
// 做完频道id选择之后,持续接收频道的数据包,并且判断是不是自己指定频道id的包
len = recvfrom(sd,msg_channel,MSG_CHANNEL_MAX,0,(void *)&raddr,&raddr_len);
// 比对serveraddr与raddr中的ip信息,判断节目单与各个频道数据的发送方是否是同一个机器,防止中间方伪造数据
if (raddr.sin_addr.s_addr != serveraddr.sin_addr.s_addr || raddr.sin_port != serveraddr.sin_port)
{
fprintf(stderr, "Ignore:address not match.\n");
continue;
}
if (len < sizeof(struct msg_channel_st))
{
fprintf(stderr, "message is too small.\n");
continue;
}
if (msg_channel->chnid == chosenid)
{
// 如果当前接收的频道的数据包就是,用户选择的频道id的数据包,则将数据写入管道的写端口,这样子进程就可以调用解码器对数据进行解析
// 可考虑设置缓存区接收字节,避免音乐播放断断续续
fprintf(stdout, "accepted channel %d msg.\n", msg_channel->chnid);
// 从msg_channel->data向管道的写端口写入数据,坚持写够字节数为len - sizeof(chnid_t)
if(writen(pd[1], msg_channel->data, len - sizeof(chnid_t)) < 0)
{
fprintf(stderr, "writen(): writen data error!\n");
exit(1);
}
}
}
free(msg_channel);
close(sd);
wait(NULL);
}
/*********************************************************************************/

exit(0);
}


CmakeLists.txt

1
2
3
4
5
6
7
8
cmake_minimum_required(VERSION 3.16)
project(client C)

set(CMAKE_C_STANDARD 99)
# 包含头文件
include_directories(../include)

add_executable(client client.c)

(3)多播组网络接口ip与套接字绑定的本机ip均设置为INADDR_ANY
1
2
3
4
5
6
7
8
// 套接字属性-加入多播组--成为多播组的成员
struct ip_mreqn mreq;
// 客户端向加入的多播组的IP地址
inet_pton(AF_INET, client_conf.mgroup, &mreq.imr_multiaddr);
// 设置本地网络接口的IP地址,用于接收多播消息
inet_pton(AF_INET, "0.0.0.0", &mreq.imr_address);
// 指定网络接口,指定多播数据由该网络接口(网卡)接收
mreq.imr_ifindex = if_nametoindex(DEFAULT_IF_NAME);
  • 设置加入多播组时使用 INADDR_ANY,这指示系统选择最合适的网络接口用于多播数据的接收。这通常是基于路由配置自动决定的,系统会选择能够最有效接收到多播流的接口
1
2
3
4
5
6
/*3. 绑定本机IP与端口*/
struct sockaddr_in laddr;
laddr.sin_family = AF_INET;
// 端口号需要与服务端一致
laddr.sin_port = htons(atoi(client_conf.rcvport));
inet_pton(AF_INET, "0.0.0.0", &laddr.sin_addr);
  • 绑定本机IP时,设置本机ip为0.0.0.0(INADDR_ANY)这个特殊的地址用于指示应用程序应监听所有可用的网络接口

(4)子进程调用解码器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 子进程-调解码器
if (pid == 0)
{
// 子进程调管道的读端,读取数据,调用解码器进行播放
close(sd); // 子进程会将父进程资源复制,关闭sd
close(pd[1]); // 关闭写端

// 0号描述符表示标准输入
// 现在从标准输入读取的数据都是从管道读端读取的数据
dup2(pd[0], 0); // 将stdin(标准输入)关闭,重定向到管道读端
if (pd[0] > 0) // 如果 pd[0]的文件描述符不为0,则关闭原始的pd[0] 文件描述符
// 因为pd[0]已经重定向到0上,保留原始的pd[0]可能会存在资源泄露
close(pd[0]);

// 通过execl函数,启动一个新的shell进程,并通过这个shell执行 client_conf.player_cmd 中定义的命令
// 通过shell 指令,执行命令 client_conf.player_cmd
// 例如 sh -c 'ls -l' 就是在当前目录下通过启动一个新的shell进程执行命令 ls -l
if (execl("/bin/sh", "sh", "-c", client_conf.player_cmd, NULL) < 0)
{
perror("execl()");
exit(1);
}
}
  • 子进程会复制父进程的资源,子进程中不需要使用套接字文件描述符,因此可以将复制父进程的套接字文件描述符关闭;
  • 子进程读取管道数据,可以关闭写端,避免文件描述符资源浪费,以及确保数据完整性
  • dup2(pd[0], 0)是将 pd[0](通常是一个管道的读端)重定向到标准输入。这意味着所有本来应从标准输入读取的操作现在都会从 pd[0] 读取数据。在头文件中已经定义了DEFAULT_PLAYERCMD调用解码器的命令的输入内容从标准输入中获取,这样重定向之后,解码器的命令的输入内容将会从管道读端获得,而管道读端将会获得父进程从多播组接收的频道(MP3文件)内容
  • 使用execl函数替换当前进程的映像为一个新的程序,即在子进程中执行一个新的调用解码器的程序

(5)父进程接收数据包
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 收节目单
close(pd[0]); //关闭读端
struct msg_list_st *msg_list; // 节目单结构体指针
msg_list = malloc(MSG_LIST_MAX);
if (msg_list == NULL)
{
perror("malloc()");
exit(1);
}
serveraddr_len = sizeof(struct sockaddr_in); // 此句关键,如何不设置,会出现服务端地址与接收端地址不匹配错误
// 接收数据
while (1)
{
// 接收的数据,放到msg_list缓冲区中,缓冲区大小为MSG_LIST_MAX
// serveraddr 存放发送数据机器的ip信息
len = recvfrom(sd, msg_list, MSG_LIST_MAX, 0, (void *)&serveraddr, &serveraddr_len);
// 数据量不够,出错
if (len < sizeof(struct msg_list_st))
{
fprintf(stderr, "message is too small.\n");
continue;
}
// 比对数据包中的频道id,节目单的频道id为LISTCHNID
if (msg_list->chnid != LISTCHNID)
{
fprintf(stderr, "received program list chnid %d is not match.\n", msg_list->chnid);
continue;
}
break;
}

// 打印节目单,选择频道
// 创建节目单中每一条记录的数据结构体指针
struct msg_listentry_st *pos;
// (char *)pos < (((char *)msg_list) + len) 中的len为上面recvfrom接收的节目单整体数据长度
for(pos = msg_list->entry;(char *)pos < (((char *)msg_list) + len);pos = (void *)(((char *)pos) + ntohs(pos->len)))
{
printf("channel %d : %s\n", pos->chnid, pos->desc); // 打印这条节目的数据
}
free(msg_list);
  • 接收节目单数据包,并且通过数据包中的频道id做校验
  • 定义节目单数据包结构体类型,并为其开辟动态内存空间,其大小为节目单数据包的推荐大小,在proto.h协议文件中存在定义MSG_LIST_MAX
  • 打印节目单信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// 选择频道
// 用户做的选择无需上传到server端,因为server端是将所有频道的数据都进行发送,用户在这些数据中挑选中自己需要的数据
puts("Please choose a channel:");

do{
ret = scanf("%d", &chosenid);
}while(ret < 1);

fprintf(stdout, "chosen channel = %d\n", chosenid);

// 收频道包,发送给子进程
struct msg_channel_st *msg_channel; // 创建用于接收指定频道数据的结构体
msg_channel = malloc(MSG_CHANNEL_MAX);
if (msg_channel == NULL)
{
perror("malloc()");
exit(1);
}
len = 0;
raddr_len = sizeof(struct sockaddr_in);
while (1)
{
// 做完频道id选择之后,持续接收频道的数据包,并且判断是不是自己指定频道id的包
len = recvfrom(sd,msg_channel,MSG_CHANNEL_MAX,0,(void *)&raddr,&raddr_len);
// 比对serveraddr与raddr中的ip信息,判断节目单与各个频道数据的发送方是否是同一个机器,防止中间方伪造数据
if (raddr.sin_addr.s_addr != serveraddr.sin_addr.s_addr || raddr.sin_port != serveraddr.sin_port)
{
fprintf(stderr, "Ignore:address not match.\n");
continue;
}
if (len < sizeof(struct msg_channel_st))
{
fprintf(stderr, "message is too small.\n");
continue;
}
if (msg_channel->chnid == chosenid)
{
// 如果当前接收的频道的数据包就是,用户选择的频道id的数据包,则将数据写入管道的写端口,这样子进程就可以调用解码器对数据进行解析
// 可考虑设置缓存区接收字节,避免音乐播放断断续续
fprintf(stdout, "accepted channel %d msg.\n", msg_channel->chnid);
// 从msg_channel->data向管道的写端口写入数据,坚持写够字节数为len - sizeof(chnid_t)
if(writen(pd[1], msg_channel->data, len - sizeof(chnid_t)) < 0)
{
fprintf(stderr, "writen(): writen data error!\n");
exit(1);
}
}
}
free(msg_channel);
close(sd);
wait(NULL);
}
  • 定义节目单数据包结构体类型,并为其开辟动态内存空间,其大小为具体频道单数据包的推荐大小,在proto.h协议文件中存在定义MSG_CHANNEL_MAX
  • 选择相应频道的数据包,并且通过,节目单数据包发送方与频道数据包发送方的IP与端口进行校验,判断节目单数据包与频道数据包是否来自于同一个发送端,防止数据被中间方伪造
  • 通过选择的频道id与接收的频道数据包中的频道id做校验,判断是否为选择的频道id数据包
  • 将频道数据包中的data数据,写入管道


3.线程令牌桶算法库–流量控制

(1)令牌桶算法头文件

mytbf.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/*多线程并发的令牌桶,流量控制*/
#ifndef SERVER_MYTBF_H
#define SERVER_MYTBF_H

#define MYTBF_MAX 1024 // 令牌桶数组容量大小

typedef void mytbf_t; // 令牌桶类型

/*初始化令牌桶*/
// cps 每次可以获得令牌数量 burst 桶内可以积攒的领牌数量上限
mytbf_t *mytbf_init(int cps,int burst);

/*从令牌桶中取令牌*/
// 从令牌桶中取得令牌,int 第二个参数是想取多少
// 返回值,为真正取得了多少
int mytbf_fetchtoken(mytbf_t *,int);

/*归还没有使用完的令牌*/
// 将没有用完的令牌数量还到令牌桶中, int 第二个参数是想还回去的数量
// 返回值,为真正还了多少
int mytbf_returntoken(mytbf_t *,int);

/*令牌桶销毁*/
int mytbf_destroy(mytbf_t *);

#endif //SERVER_MYTBF_H
  • 令牌桶算法的头文件中通过typedef void mytbf_t,隐藏令牌桶算法的真实数据类型

(2)令牌桶源文件

mytbf.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
#include <stdlib.h>
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/time.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <string.h>
#include <pthread.h>
#include <errno.h>
#include <syslog.h>

#include "mytbf.h"

/*令牌桶类型数组,对于文件可以有MYTBF_MAX中传输速率选择*/
static struct mytbf_st* job[MYTBF_MAX]; // 这是多个进程共享的资源,需要作线程同步,避免数据混乱
// 线程互斥量的初始化
static pthread_mutex_t mut_job = PTHREAD_MUTEX_INITIALIZER;
// 子进程ID
static pthread_t tid_alarm;
// 定义全局 once 控制变量,这个变量跟踪初始化函数是否已被调用
static pthread_once_t init_once = PTHREAD_ONCE_INIT;

/*一个令牌桶的信息结构体*/
struct mytbf_st
{
int cps; // 令牌桶每次可以积攒的令牌数
int burst; // 一个令牌桶中积攒的令牌数上限
int token; // 一个令牌桶的实时令牌数
int pos; // 该令牌桶位于令牌桶系统中的令牌桶数组中的位置索引
pthread_mutex_t mut; // 每一个令牌桶通过互斥量保证token的同步
pthread_cond_t cond; // 条件变量--用于通知令牌桶令牌数量的变化
};


/*为令牌桶系统中每个令牌桶积攒令牌的子线程函数*/
static void *thr_alarm(void *p)
{
struct timespec t;
while(1)
{
// 访问令牌桶系统的令牌桶结构体数组的共享资源,枷锁
pthread_mutex_lock(&mut_job);
for (int i = 0; i < MYTBF_MAX; i++)
{

// 当前令牌桶数组位置存在数据
if(job[i] != NULL)
{
// 操作该令牌桶的令牌数,枷锁
pthread_mutex_lock(&job[i]->mut);

// 为i位置的令牌桶装载令牌
job[i]->token += job[i]->cps;
// 但是令牌桶所积累的令牌数量需要维持在令牌桶的上限
if(job[i]->token > job[i]->burst)
job[i]->token = job[i]->burst;
// for 循环内给令牌桶数组中的所有令牌桶均加了token
// 需要唤醒所有的阻塞(挂起)的线程,去唤醒,此时令牌桶中的令牌>0,可以消费了
pthread_cond_broadcast(&job[i]->cond);

pthread_mutex_unlock(&job[i]->mut);
}
}

pthread_mutex_unlock(&mut_job);

// 使用nanosleep函数使得线程休眠1s
t.tv_sec = 1;
t.tv_nsec = 0;

while(nanosleep(&t,&t) != 0)
{
// 真错
if(errno != EINTR)
{
fprintf(stderr,"nanosleep():%s\n", strerror(errno));
exit(1);
}
}
}
}


/*模块卸载*/
static void module_unload(void)
{
// 取消为令牌桶系统中每个令牌桶积攒令牌的子线程
pthread_cancel(tid_alarm);
// 资源回收
pthread_join(tid_alarm, NULL);

for (int i = 0; i < MYTBF_MAX; i++)
{
if(job[i] != NULL)
{
// 销毁令牌桶系统中存在的令牌桶
mytbf_destroy(job[i]);
}
}

// 销毁互斥锁
pthread_mutex_destroy(&mut_job);
}


/*模块加载*/
static void module_load(void)
{
int err;
// 创建令牌桶系统中为各个存在的令牌桶积攒令牌的子线程--该子线程在整个令牌桶系统中只创建一个
err = pthread_create(&tid_alarm,NULL,thr_alarm,NULL);
if(err)
{
fprintf(stderr,"pthread_create():%s\n", strerror(err));
exit(1);
}

// 钩子函数,程序结束前最后执行,对模块进行卸载
atexit(module_unload);
}


static int min(int a,int b)
{
return a<b ? a:b;
}


/*在令牌桶数组中找到空位置*/
// 这个函数本身没有加线程锁,但是调用的时候应该枷锁
// 因为令牌桶数组为共享资源,需要保持线程同步,所以设置为unlocked
static int get_free_pos_unlocked(void)
{
for(int i = 0; i < MYTBF_MAX; i++)
{
if(job[i] == NULL)
return i; // 返回空位置的下标
}
return -1;
}


/*
* 令牌桶初始化
* cps:每次可以取得的令牌数 burst 令牌桶中可以积攒的令牌数量上限
* return: 一个令牌桶类型的指针
* */
mytbf_t *mytbf_init(int cps,int burst)
{
struct mytbf_st *me;

// 模块加载函数,pthread_once即使多个线程执行,但是其只会在第一个调用的线程中执行一次
pthread_once(&init_once,module_load);

// 为当前的令牌桶开辟空间
me = malloc(sizeof(*me));
if(me == NULL)
return NULL;

me->token = 0; // 当前令牌桶,初始令牌为0
me->cps = cps; // 每次可以获得的令牌数量为cps
me->burst = burst; // 令牌桶中可以积攒的令牌数量上限为burst
pthread_mutex_init(&me->mut,NULL); // 关于令牌桶中令牌的互斥量初始化
pthread_cond_init(&me->cond,NULL); // 初始化条件变量

// 访问共享资源,枷锁保持线程同步
pthread_mutex_lock(&mut_job);
int pos = get_free_pos_unlocked();
if(pos < 0)
{
pthread_mutex_unlock(&mut_job); // 失败,解锁退出
free(me);
return NULL;
}

me->pos = pos;
job[pos] = me;
// 解锁
pthread_mutex_unlock(&mut_job);


return me;
}


/*
* 从令牌桶中取得令牌,int 第二个参数是想取多少
* */
// 返回值,为真正取得了多少令牌
int mytbf_fetchtoken(mytbf_t *ptr,int size)
{
struct mytbf_st *me = ptr;
int n;

if(size < 0)
// EINVAL(Invalid Argument)是另一个常见的错误代码,表示传递给系统调用的一个或多个参数是无效的
return -EINVAL; // 这样就可以使用strerror(errno)进行报错

// 访问该令牌桶中的令牌,但是可能有其他线程此时正准备归还令牌,因此需要保证线程同步
// 枷锁
pthread_mutex_lock(&me->mut);

while(me->token <= 0)
{
// 如果没有条件变量通知,线程会阻塞在此处,挂起不会占用CPU很多资源
// 直到 thr_alarm 与 mytbf_returntoken 函数中使用pthread_cond_broadcast 通知条件变量
// 唤醒阻塞在这里的所有线程,并且抢锁,枷锁,查看token值
pthread_cond_wait(&me->cond,&me->mut);

}

// 取小值
n = min(me->token,size);
me->token -= n; // 令牌数量减少
// 解锁
pthread_mutex_unlock(&me->mut);

return n;
}


/*
* 将没有用完的令牌数量还到令牌桶中, int 第二个参数是想还回去的数量
* */
// 返回值,为真正还了多少令牌
int mytbf_returntoken(mytbf_t *ptr,int size)
{
struct mytbf_st *me = ptr;

if(size < 0)
// EINVAL(Invalid Argument)是另一个常见的错误代码,表示传递给系统调用的一个或多个参数是无效的
return -EINVAL; // 这样就可以使用strerror(errno)进行报错

// 访问该令牌桶中的令牌,但是可能有其他线程此时正准备归还令牌,因此需要保证线程同步
// 枷锁
pthread_mutex_lock(&me->mut);
me->token += size; // 归还了,当前令牌桶的令牌数量增加
// 但是还是要约束在上限内
if(me->token > me->burst)
me->token = me->burst;

// 这个时候也需要唤醒阻塞的线程,因为此时的token>0,也可以进行消费了
// 通过条件变量发送通知
pthread_cond_broadcast(&me->cond);
pthread_mutex_unlock(&me->mut);

return size;
}


/*
* 销毁令牌桶
* */
int mytbf_destroy(mytbf_t *ptr)
{
// 将传入的令牌桶销毁
struct mytbf_st *me = ptr;

// 将当前令牌桶占用的令牌桶数组位置赋值为NULL,共享资源,枷锁
pthread_mutex_lock(&mut_job);
job[me->pos] = NULL;
pthread_mutex_unlock(&mut_job);

// 销毁该令牌桶中有关令牌的互斥量
pthread_mutex_destroy(&me->mut);
// 条件变量也进行销毁
pthread_cond_destroy(&me->cond);
// 释放创建传入的令牌桶开辟的动态空间
free(ptr);

return 0;
}
  • 整个令牌桶系统只会创建一个子线程用于,为所有创建的令牌桶固定事件取得CPS数量的令牌
  • mytbf_t *mytbf_init(int cps,int burst)函数中,使用 pthread_once(&init_once,module_load)代码调用一次module_load函数为令牌桶系统创建一个子线程用于,为所有创建的令牌桶固定事件取得CPS数量的令牌
  • 令牌桶系统中的static struct mytbf_st* job[MYTBF_MAX]令牌桶数组为共享资源,每次使用之前需要枷锁进行线程同步
  • 每次对各个令牌桶中的令牌token进行增加或者归还或者使用时,均需要枷锁,进行线程同步
  • 在媒体库中会为每一个频道,创建一个令牌桶,实现流量控制


4.媒体库

媒体库的下的目录结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
(base) zxz@ubuntu:~/Proj/C_C++/linux_c/IPV4_StreamingMedia/Music$ tree
.
├── ch1
│   ├── desc.txt
│   └── love1.mp3
├── ch2
│   ├── desc.txt
│   └── love2.mp3
└── ch3
├── desc.txt
└── love3.mp3

上述目录结构,表示本地文件系统中的媒体库中,有三个频道目录,ch1/ch2/ch3,每个频道目录有一个描述文件desc.txt与一个mp3文件,描述文件用于描述,mp3音乐文件的音乐类型


(1)媒体库头文件

medialib.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#ifndef IPV4_STREAMING_MEDIA_MEDIALIB_H
#define IPV4_STREAMING_MEDIA_MEDIALIB_H

#include "site_type.h"

#define MP3_PARTERN "/*.mp3"
#define DESC_FNAME "/desc.txt"

// 推荐的mp3播放速率
#define MP3_BITRATE (128 * 1024) // MP3文件播放的比特率 这代表 128kbps(千比特每秒)

// 媒体库中每一个频道信息的结构体
// 当然也可以使用一个相同类型的指针,指向一个结构体数组,代表媒体库中所有的频道信息
struct mlib_listentry_st{
chnid_t chnid; // 频道id
char *desc; // 频道描述信息
};

/*获取频道库的频道信息*/
int mlib_getchnlist(struct mlib_listentry_st **,int *);

/*释放节目单信息*/
int mlib_freechnlist(struct mlib_listentry_st *);

/*读取每个频道信息*/
// chnid_t 表示频道id,指定需要读取的频道
// void * 表示读取信息的缓冲区,读取的数据存储到缓冲区
// size_t 类型参数 表示需要读取的字节数量
size_t mlib_readchn(chnid_t, void *, size_t);

#endif //IPV4_STREAMING_MEDIA_MEDIALIB_H

(2)媒体库源文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
#include <stdio.h>
#include <stdlib.h>
#include <glob.h>
#include <syslog.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>
#include <unistd.h>

#include "medialib.h"
#include "proto.h"
#include "mytbf.h"
#include "server_conf.h"

#define PATHSIZE 1024
#define LINEBUFSIZE 1024

struct channel_context_st{
chnid_t chnid; // 频道id
char *desc; // 指向频道的描述信息
glob_t mp3glob;
int pos;
int fd; // 频道目录下打开的mp3文件的文件描述符
off_t offset; // 发送这个数据是一段一段发送出去的
mytbf_t *tbf; // 流量控制---一个频道一个令牌桶
};

// 创建结构体数组存储所有的频道节目信息
static struct channel_context_st channel[MAXCHNID + 1]; // 100 个频道 + 节目单的频道id
static chnid_t curr_id = MINCHNID;


static int open_next(chnid_t chnid)
{
for (int i = 0; i < channel[chnid].mp3glob.gl_pathc; i++)
{
channel[chnid].pos++;
//所有的歌曲都没有打开
if (channel[chnid].pos == channel[chnid].mp3glob.gl_pathc)
{
channel[chnid].pos = 0; //再来一次
}

close(channel[chnid].fd);
channel[chnid].fd = open(channel[chnid].mp3glob.gl_pathv[channel[chnid].pos], O_RDONLY);
//如果打开还是失败
if (channel[chnid].fd < 0)
{
syslog(LOG_WARNING, "open(%s):%s", channel[chnid].mp3glob.gl_pathv[chnid], strerror(errno));
}
else //success
{
channel[chnid].offset = 0;
return 0;
}
}
syslog(LOG_ERR, "None of mp3 in channel %d is available.", chnid);
return -1;
}


/*
* func:
* 解析path路径下的频道信息是否合法,如果合法返回合法频道的记录信息
* 若该频道合法,还会在该函数中,为该频道创建一个令牌桶,实现流量控制
* parameter:
* const char *path:
* 具体的频道目录,若媒体库目录为./Music 在媒体库下有三个频道目录ch1、ch2、ch3
* 则具体的频道目录可以是 ./Music/ch_{i} 其中嗯i 可以是1、2、3
* return:
* 失败,返回NULL
* 成功,返回struct channel_context_st类型的结构体指针
* */
static struct channel_context_st *path2entry(const char *path)
{
char pathstr[PATHSIZE] = {'\0'};
char linebuf[LINEBUFSIZE];
FILE *fp;
struct channel_context_st *me;

// 将需要解析的目录复制给字符串数组 pathstr
strncpy(pathstr,path,PATHSIZE);
// 将字符串拼接并且复制给字符串数组 pathstr
// 此时pathstr指向的是 path 目录下的描述文件
strncat(pathstr,DESC_FNAME,PATHSIZE);

// 1. 处理描述文件
// 打开描述文件
fp = fopen(pathstr,"r");
if(fp == NULL)
{
// 描述文件不存在,说明path路径的频道信息不合法
syslog(LOG_INFO,"%s is not a channel dir (can not find desc.txt)",path);
return NULL;
}

// 文件存在,但是描述文件内容为空,如果不为空,那么文件第一行内容就是频道的描述
// 将第一行的描述信息存储到linebuf中
if(fgets(linebuf,LINEBUFSIZE,fp) == NULL)
{
syslog(LOG_INFO,"%s is not a channel dir (can get the desc.txt,but Content is empty)",path);
fclose(fp); // 关闭文件
return NULL;
}
fclose(fp); // 关闭文件

me = malloc(sizeof(*me));
if(me == NULL)
{
syslog(LOG_ERR,"malloc:%s", strerror(errno));
return NULL;
}

// 为该合法频道创建一个令牌桶作流量控制
me->tbf = mytbf_init(MP3_BITRATE / 8, MP3_BITRATE / 8 * 10);

if(me->tbf == NULL)
{
syslog(LOG_ERR, "mytbf_init():%s", strerror(errno));
free(me);
return NULL;
}
// 为合法的频道的描述文件内容分配一个新的内存空间,并且使用me结构体指针指向,作回传
me->desc = strdup(linebuf);
// 使用如下指令会段错误,因为me->desc,只是一个字符串指针。并没有实际的内存空间
// strcpy(me->desc,linebuf);

// 2.处理mp3文件
// 将需要解析的目录复制给字符串数组 pathstr
strncpy(pathstr,path,PATHSIZE);
// 将字符串拼接并且复制给字符串数组 pathstr
// 此时pathstr指向的是 path 目录下的描述文件
strncat(pathstr,MP3_PARTERN,PATHSIZE);
// 路径匹配,找到频道目录下的所有mp3文件
if(glob(pathstr,0,NULL,&me->mp3glob) != 0)
{
syslog(LOG_ERR, "%s is not a channel dir(can not find mp3 files)", path);
mytbf_destroy(me->tbf); // 销毁令牌桶
free(me);
return NULL;
}
me->pos = 0;
me->offset = 0;
// 打开mp3文件
me->fd = open(me->mp3glob.gl_pathv[me->pos],O_RDONLY);

if(me->fd < 0)
{
// mp3文件打不开
syslog(LOG_WARNING, "%s open failed.", me->mp3glob.gl_pathv[me->pos]);
mytbf_destroy(me->tbf); // 销毁令牌桶
free(me);
return NULL;
}
me->chnid = curr_id;
curr_id++;
return me;
}


/*
* func:
* 获取频道库的所有合法频道的信息 (频道id + 描述信息),获得媒体库中的节目单信息
* parameter:
* struct mlib_listentry_st **result:指向所有合法频道的记录信息
* int *resume:记录了媒体库中有多少条合法的频道记录信息
* */
int mlib_getchnlist(struct mlib_listentry_st **result,int *resume)
{
char path[PATHSIZE];
glob_t globres;
int num = 0;
struct mlib_listentry_st *ptr; // 节目单中的一条频道信息结构体
struct channel_context_st *res; // 每一条频道信息指针

// 频道节目数组初始化
for(int i = 0;i<MAXCHNID+1;i++)
{
channel[i].chnid = -1;
}

// 媒体库目录路径存储到path字符数组中
// "%s/*" 后面的*号代表通配符
snprintf(path,PATHSIZE,"%s/*",server_conf.media_dir);

// 使用glob函数查找媒体库目录,并且将目录信息存储到globres结构体中
// 成功时返回0
if(glob(path,0,NULL,&globres))
{
syslog(LOG_DEBUG,"err 1");
return -1;
}

// ptr 指针指向的内存空间大小,应该是媒体库所有频道记录的和
// globres.gl_pathc 记录了媒体库中总共有多少个存在的频道(有多少个chi目录,除了发送节目单的频道)
ptr = malloc(sizeof(struct mlib_listentry_st) * globres.gl_pathc);
if(ptr == NULL)
{
// 向系统日志发送错误信息
syslog(LOG_ERR,"malloc error in medialib.c in mlib_getchnlist.\n");
exit(1);
}

// 通过for中函数进行解析,判断媒体库中那些频道源文件是合法的
// gl_pathc字段是匹配到的文件数量 gl_pathv指向找到的文件名数组的指针
for(int i = 0;i<globres.gl_pathc;i++)
{
// 将路径传入函数进行解析,判断这条频道记录是否合法,如果非法传回NULL,合法将频道的信息传回来
// globres.gl_pathv[i] 其实就是 /var/music/ch(i+1)
res = path2entry(globres.gl_pathv[i]);

if(res != NULL)
{
// 这条频道信息合法,向系统日志发送告知这条频道的id以及描述信息
syslog(LOG_DEBUG,"Channel: %d desc: %s",res->chnid,res->desc);
// memcpy 将 path2entry 返回回来的这条合法的频道信息,存储到结构体数组对应的位置
memcpy(channel + res->chnid,res,sizeof(*res));
ptr[num].chnid = res->chnid;
ptr[num].desc = strdup(res->desc); // 字符串复制
// strcpy(ptr[num].desc,res->desc);
// 合法记录+1
num++;
}
}

// *result 指向的 真实合法的频道记录的总和
// 例如,globres.gl_pathc 有4个匹配的目录结构,但是经过for循环解析,发现有两个目录结构是假的,不合法
// 那么合法(可用)的只有2个频道信息。 *result 就是指向 ptr中两条合法的频道信息记录
*result = realloc(ptr,sizeof(struct mlib_listentry_st) * num);
if(*result == NULL)
{
// 向系统日志发送错误
syslog(LOG_ERR,"relloc failed in medialib.c in mlib_getchnlist.\n");
exit(1);
}

*resume = num; // resume 将合法的频道记录数量回传

return 0;
}


/*
* func:
* 读取每个频道下的mp3文件内容,通过令牌桶控制读取速度
* parameter:
* chnid_t chnid 表示频道id,指定需要读取的频道
* void *buf 表示读取信息的缓冲区,读取的数据存储到缓冲区
* size_t size 类型参数 表示需要读取的字节数量
* return:
* 返回从mp3文件中读取的内容字节长度
* */
size_t mlib_readchn(chnid_t chnid, void *buf, size_t size)
{
int tbfsize;
int len;

// 获得令牌
tbfsize = mytbf_fetchtoken(channel[chnid].tbf,size);

while(1)
{
// 读取mp3文件 buf:指向一个缓冲区的指针,用来存储从文件中读取的数据
// tbfsize 指定要读取的字节数
// offset:文件中的偏移量,从文件开始处的字节数,指定从哪里开始读取数据
len = pread(channel[chnid].fd,buf,tbfsize,channel[chnid].offset);
if(len < 0)
{
//当前这首歌可能有问题,读取下一首歌
syslog(LOG_WARNING, "media file %s pread():%s", channel[chnid].mp3glob.gl_pathv[channel[chnid].pos], strerror(errno));
open_next(chnid);
}
else /*len > 0*/ //真正读取到了数据
{
channel[chnid].offset += len; // 下次从这个偏移量继续读
break;
}
}

if((tbfsize - len) > 0)
// 归还没有使用完的令牌
mytbf_returntoken(channel[chnid].tbf, tbfsize - len);

return len;
}


/*释放节目单信息*/
int mlib_freechnlist(struct mlib_listentry_st *ptr)
{
free(ptr);
return 0;
}

(3)解析媒体库下的具体频道目录下的信息

函数static struct channel_context_st *path2entry(const char *path)

1
2
3
4
5
6
7
8
9
10
11
12
13
/*
* func:
* 解析path路径下的频道信息是否合法,如果合法返回合法频道的记录信息
* 若该频道合法,还会在该函数中,为该频道创建一个令牌桶,实现流量控制
* parameter:
* const char *path:
* 具体的频道目录,若媒体库目录为./Music 在媒体库下有三个频道目录ch1、ch2、ch3
* 则具体的频道目录可以是 ./Music/ch_{i} 其中嗯i 可以是1、2、3
* return:
* 失败,返回NULL
* 成功,返回struct channel_context_st类型的结构体指针
* */
static struct channel_context_st *path2entry(const char *path)
  • 判断描述文件是否合法,每一个描述文件均为desc.txt,其核心内容为第一行,描述的内容是mp3文件的音乐类型。首先判断描述文件是否存在,其次判断描述文件中是否有内容
  • 使用mytbf_init为该频道创建一个令牌桶,进行流量控制
  • 判断频道核心内容mp3文件是否存在,若存在则打开

(4)获取媒体库的节目单信息

函数int mlib_getchnlist(struct mlib_listentry_st **result,int *resume)

用于获取媒体库中所有的合法频道信息 (频道id + 描述信息)

1
2
3
4
5
6
7
8
/*
* func:
* 获取频道库的所有合法频道的信息 (频道id + 描述信息),获得媒体库中的节目单信息
* parameter:
* struct mlib_listentry_st **result:指向所有合法频道的记录信息
* int *resume:记录了媒体库中有多少条合法的频道记录信息
* */
int mlib_getchnlist(struct mlib_listentry_st **result,int *resume);

(5)读取每一个频道目录下的mp3文件内容
1
2
3
4
5
6
7
8
9
10
11
/*
* func:
* 读取每个频道下的mp3文件内容,通过令牌桶控制读取速度
* parameter:
* chnid_t chnid 表示频道id,指定需要读取的频道
* void *buf 表示读取信息的缓冲区,读取的数据存储到缓冲区
* size_t size 类型参数 表示需要读取的字节数量
* return:
* 返回从mp3文件中读取的内容字节长度
* */
size_t mlib_readchn(chnid_t chnid, void *buf, size_t size);


5.节目单线程

(1)节目单线程头文件

thr_list.h

1
2
3
4
5
6
7
8
9
10
11
12
#ifndef IPV4_STREAMING_MEDIA_THR_LIST_H
#define IPV4_STREAMING_MEDIA_THR_LIST_H

#include "medialib.h"

/*创建节目单线程*/
int thr_list_create(struct mlib_listentry_st *,int);

/*销毁节目单线程*/
int thr_list_destroy(void);

#endif //IPV4_STREAMING_MEDIA_THR_LIST_H

(2)节目单线程源文件

thr_list.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
#include <pthread.h>
#include <syslog.h>
#include <string.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <arpa/inet.h>
#include <sys/socket.h>

#include "proto.h"
#include "thr_list.h"
#include "server_conf.h"
#include "medialib.h"

// 节目单线程id
static pthread_t tid_list;
// 节目单中包含的节目数量
static int nr_list_ent;
// 节目单信息数组(存储所有有效的频道信息:频道id + 描述信息)
static struct mlib_listentry_st *list_ent;


/*
* func:
* 节目单线程函数,用于处理节目单
* */
static void *thr_list(void *p)
{
int totalsize;
struct msg_list_st *entlistp; // 用于发送的节目单数据包结构体指针
struct msg_listentry_st *entryp; // 用于发送的节目单中的一条频道信息的数据包结构体指针
int size;
int ret;
struct timespec t;

/*
* // 用于发送的单个频道信息 频道id + 描述信息
* struct msg_listentry_st{
* chnid_t chnid;
* uint16_t len; // 这条记录数据的长度 频道id + uint16_t + 描述信息 的 总共大小
* uint8_t desc[1]; // 变长数组,柔性数组成员,用于存储频道的描述信息
* }__attribute__((packed)); // 不使用对齐
*
* // 用于发送的节目单信息 节目单id + 用于发送的单个频道信息(频道id + 描述信息)
* struct msg_list_st{
* chnid_t chnid; // 节目单的频道id LIST_CHNID
* struct msg_listentry_st entry[1]; // 柔性结构体数组,用于存储多条 用于发送的单个频道信息
* }__attribute__((packed));
* */


totalsize = sizeof(chnid_t); // 一个频道id的大小---节目单的频道id

for(int i=0;i<nr_list_ent;i++)
{
// 计算为所有有效的频道信息的存储空间大小 (每个频道的id + 各自的描述信息)
totalsize += sizeof(struct msg_listentry_st) + strlen(list_ent[i].desc);
}

// 此时totalsize 存储节目单的空间大小
entlistp = malloc(totalsize);
if(entlistp == NULL)
{
// 内存开辟失败
syslog(LOG_ERR,"malloc():%s", strerror(errno));
exit(1);
}

entlistp->chnid = LISTCHNID; // 节目单的频道id
entryp = entlistp->entry; // 指向所有的有效频道的 (频道id + 描述信息)
// 此时对entryp进行操作,实际就是对entlistp->entry进行操作,指向同一块内存空间
// entryp 指向的是 entlistp->entry指向空间的起始地址

for(int i = 0;i<nr_list_ent;i++)
{
// 计算每一个有效频道信息的存储空间大小
// 频道id + uint16_t + 描述信息 的总大小 (总的字节数)
size = sizeof(struct msg_listentry_st) + strlen(list_ent[i].desc);

// uint_8类型变量为什么不需要转化为网络字节序
// 因为对于单字节数据类型不要进行字节序转换,因为单字节数据(8位)在内存中只占用一个字节,无论是大端字节序还是小端字节序,其在内存中的表达方式都是一致的。
// uint_8 类型数据也是 8位 在内存中只占用一个字节
entryp->chnid = list_ent[i].chnid;
// size 为int 变量,进行网络传输,字节序应该从主机序转化为网络序
entryp->len = htons(size);

strcpy(entryp->desc,list_ent[i].desc);
// entryp 指向的地址向后移动,用于填写entlistp->entry内存空间中的下一个频道的信息
// (char *)entryp:将 entryp 转换为 char * 类型。char * 是指向字符的指针,而一个字符(在C语言中)通常占一个字节。
// 这种转换使得对指针的增加操作变得以字节为单位,因为对 char * 指针的算术操作会按字节来进行。
// (void *) 可以指向任何类型的数据,常用于泛型数据处理
entryp = (void *)(((char *)entryp) + size);
}

// 在0号频道发送节目单
while (1)
{
// 将节目单信息发送(报式套接字 UDP 使用sendto)
// serverSd为服务端创建的UDP套接字文件描述符
// 将节目单数据包发送至多播组ip
ret = sendto(serverSd,entlistp,totalsize,0,(void *)&sndaddr,sizeof sndaddr);
if(ret < 0)
{
// 发送失败
syslog(LOG_WARNING,"sendto():%s", strerror(errno));
}
else
{
syslog(LOG_DEBUG,"send to program list succeed.");
}

// 1s发送一次节目单数据包
// 使用nanosleep函数使得线程休眠1s
t.tv_sec = 1;
t.tv_nsec = 0;
while(nanosleep(&t,&t) != 0)
{
// 真错
if(errno != EINTR)
{
fprintf(stderr,"nanosleep():%s\n", strerror(errno));
exit(1);
}
}
}
}


/*
* func:
* 创建节目单线程
* parameter:
* struct mlib_listentry_st *listp : 存储着媒体库中所有的频道信息(这是从媒体库中获得的信息)
* 频道id 以及 频道描述信息
* */
int thr_list_create(struct mlib_listentry_st *listp,int nr_ent)
{
int err;
list_ent = listp;
nr_list_ent = nr_ent;


err = pthread_create(&tid_list,NULL,thr_list,NULL);
if(err)
{
// err 非0 表示线程创建失败
syslog(LOG_ERR,"pthread_create():%s",strerror(errno));
return -1; // 创建失败return -1
}
return 0;
}


/*销毁节目单线程*/
int thr_list_destroy(void)
{
// 发送节目单的线程取消
pthread_cancel(tid_list);
// 线程资源回收
pthread_join(tid_list,NULL);
return 0;
}

(3)节目单线程任务函数
1
2
3
4
5
6
/*
* func:
* 节目单线程函数,用于处理从媒体库中获得的节目单数据,将节目单数据发送给多播组ip
* 每秒执行一次
* */
static void *thr_list(void *p)


6.频道线程

(1)频道线程头文件

thr_channel.h

1
2
3
4
5
6
7
8
9
10
11
12
#ifndef IPV4_STREAMING_MEDIA_THR_CHANNEL_H
#define IPV4_STREAMING_MEDIA_THR_CHANNEL_H

#include "medialib.h"

int thr_channel_create(struct mlib_listentry_st *);

int thr_channel_destroy(struct mlib_listentry_st *);

int thr_channel_destroyall(void);

#endif //IPV4_STREAMING_MEDIA_THR_CHANNEL_H

(2)频道线程源文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
#include <stdlib.h>
#include <stdlib.h>
#include <errno.h>
#include <pthread.h>
#include <syslog.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>

#include "proto.h"
#include "server_conf.h"
#include "thr_channel.h"


/*每个频道由一个线程负责*/
struct thr_channel_ent_st{
chnid_t chnid;
pthread_t tid;
};


static int tid_nextpos = 0;
/*频道线程的结构体数组 CHNNR 为频道(最多)数量*/
struct thr_channel_ent_st thr_channel[CHNNR];


/*
* func:
* 频道线程的处理函数,读取媒体库中,具体频道的mp3文件内容,并且发送多播组ip
* parameter:
* void *ptr
* 为单个频道的信息 包含 频道id + 描述信息
* void * 为万能类型,在函数体内部还是得进行显式转换
* */
static void *thr_channel_snder(void *ptr)
{
struct msg_channel_st *sbufp; // 在网络通信中发送的频道信息数据包
int len;
struct mlib_listentry_st *ent = ptr;


// MSG_CHANNEL_MAX 为 msg_channel_st具体频道数据包的最大长度
sbufp = malloc(MSG_CHANNEL_MAX);
if(sbufp == NULL)
{
syslog(LOG_ERR,"malloc():%s",strerror(errno));
exit(1);
}
sbufp->chnid = ent->chnid; // 频道号是uint_8 8位相当于单字节 无需字节序转换

// 频道内容读取
while(1)
{
// 读取的内容存放到缓冲区 sbutf->data 中
len = mlib_readchn(ent->chnid,sbufp->data,MSG_DATA);

// 发送频道中的内容至多播放组ip
if(sendto(serverSd,sbufp,len+sizeof(chnid_t),0,(void *)&sndaddr, sizeof(sndaddr)) < 0)
{
syslog(LOG_ERR, "thr_channel(%d):sendto():%s", ent->chnid, strerror(errno));
break;
} else{
syslog(LOG_DEBUG, "thr_channel(%d): sendto() succeed.", ent->chnid);
}
// 让出调度器,让别的线程使用CPU时间片
sched_yield();
}
// 线程退出
pthread_exit(NULL);
}

/*
* func:
* 创建频道线程
* parameter:
* struct mlib_listentry_st *ptr: 为单个频道的信息 包含 频道id + 描述信息
* return:
* 成功,返回0
* 失败,返回错误码 errno
* */
int thr_channel_create(struct mlib_listentry_st *ptr)
{
int err;

if(tid_nextpos >= CHNNR)
{
// 频道线程的结构体数组空间不够
return -ENOSPC;
}
// 创建频道线程
err = pthread_create(&thr_channel[tid_nextpos].tid,NULL,thr_channel_snder,ptr);

if(err)
{
syslog(LOG_WARNING,"pthread_create():%s",strerror(err));
return -err; // pthread_create函数出错的时候,返回其出错码
}
// 为当前频道线程结构体数组中当前索引位置的频道线程结构体 设置字段的值
thr_channel[tid_nextpos].chnid = ptr->chnid;
// 当前索引位置已经被该线程占用,索引位置+1
tid_nextpos++;

return 0;
}


/*
* func:
* 销毁频道线程
* parameter:
* struct mlib_listentry_st *ptr: 为单个频道的信息 包含 频道id + 描述信息
* return:
* 失败,返回错误码
* 成功,返回0
* */
int thr_channel_destroy(struct mlib_listentry_st *ptr)
{
// 遍历线程结构体数组
for(int i = 0;i>CHNNR;i++)
{
// 找到负责该频道的线程
if(thr_channel[i].chnid = ptr->chnid)
{
// 线程取消
if(pthread_cancel(thr_channel[i].tid) < 0)
{
// 取消失败
syslog(LOG_ERR,"pthread_cannel():thr thread of channel %d",ptr->chnid);
return -ESRCH; // 错误码 表示没有找到这样的线程
}
// 线程资源回收
pthread_join(thr_channel[i].tid,NULL);
thr_channel[i].chnid = -1; // 频道线程结构体数组位置取消占用
}
}
return 0;
}


/*
* 销毁所有的频道线程
* */
int thr_channel_destroyall(void)
{
for(int i=0;i<CHNNR;i++)
{
// 不为-1 这个索引位置有频道线程
if(thr_channel[i].chnid > 0)
{
// 线程取消
if(pthread_cancel(thr_channel[i].tid) < 0)
{
syslog(LOG_ERR,"pthread_cannel():thr thread of channel %d",thr_channel[i].chnid);
return -ESRCH;
}
// 线程资源回收
pthread_join(thr_channel[i].tid,NULL);
thr_channel[i].chnid = -1;
}
}
return 0;
}

(3)节目单线程任务函数

通过媒体库中函数mlib_readchn读取具体频道的mp3文件内容,并且发送多播组ip

1
2
3
4
5
6
7
8
9
/*
* func:
* 频道线程的处理函数,读取媒体库中,具体频道的mp3文件内容,并且发送多播组ip
* parameter:
* void *ptr
* 为单个频道的信息 包含 频道id + 描述信息
* void * 为万能类型,在函数体内部还是得进行显式转换
* */
static void *thr_channel_snder(void *ptr);


7.服务端

服务端主要的工作任务如下:

  • 判断命令行输入,判断是否将服务端作为一个守护进程运行
  • 创建多播组,定义服务端发送数据的远端ip信息
  • 从媒体库获取合法的节目单信息
  • 通过获取的节目单信息,创建频道线程,有多少合法频道就创建多少频道子线程
  • 创建节目单线程,只需要创建一个节目单线程发送节目单数据包
  • 将服务端进程挂起,等待信号打断

服务端程序的工作流程:

  • 定义进程中信号行为,用于捕捉信号,退出守护进程
  • 命令行分析:通过getopt函数获得,运行客户端程序时,命令行中所携带的参数,对参数进行分析,运行不同的命令(指定接收端口、指定多播组、指定接收端口、前台运行,指定媒体库路径,指定网络设备,这些不同的命令均有默认的参数)
  • 判断运行模式是否为前台运行,或者为守护进程后台运行
  • UDP套接字初始化,设置套接字属性,创建多播组,定义服务端发送数据的远端ip信息
  • 获取节目单数据
  • 创建频道线程,向多播组ip发送频道数据包
  • 创建节目单线程,向多播组ip发送节目单数据包

image-20240513225429630

(1)服务端头文件

server_conf.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#ifndef IPV4_STREAMING_MEDIA_SERVER_CONF_H
#define IPV4_STREAMING_MEDIA_SERVER_CONF_H

#define DEFAULT_MEDIADIR "/home/zxz/Proj/C_C++/linux_c/IPV4_StreamingMedia/Music" // 默认的媒体库位置
#define DEFAULT_IF "ens33" // 默认的网卡设备(网络接口)

enum{
RUN_DAEMON = 1, // 以daemon模式运行,守护进程的形式运行在后台
RUN_FOREGROUND // 作为前台方式运行
};

/*客户端命令行指令结构体*/
struct server_conf_st{
char *rcvport; // 端口
char *mgroup; // 多播组ip
char *media_dir; // 媒体库路径
char *runmode; // 运行模式
char *ifname; // 网络设备
};

extern struct server_conf_st server_conf;
extern int serverSd; // 服务端的UDP套接字文件描述符
extern struct sockaddr_in sndaddr;

#endif //IPV4_STREAMING_MEDIA_SERVER_CONF_H


(2)服务端源文件

server.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
#include <stdio.h>
#include <stdlib.h>
#include <proto.h>
#include <unistd.h>
#include <syslog.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <error.h>
#include <errno.h>
#include <string.h>
#include <signal.h>
#include <sys/socket.h>
#include <netinet/ip.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <net/if.h>
#include "proto.h"

#include "server_conf.h"
#include "medialib.h"
#include "thr_channel.h"
#include "thr_list.h"

int serverSd; // 不能使用static 修饰,否则其链接性变为内部
struct sockaddr_in sndaddr; // 存储远端ip信息,对于服务端需要将数据包发送至多播组,远端即多播组
static struct mlib_listentry_st *list; //

/*
* -M 指定多播组
* -P 指定接收端口
* -F 前台运行
* -D 指定媒体库路径
* -I 指定网络设备
* -H 显示帮助
*/
// 服务端结构体字段初始化
struct server_conf_st server_conf = {
.rcvport = DEFAULT_RCVPORT,
.mgroup = DEFAULT_MGROUP,
.media_dir = DEFAULT_MEDIADIR,
.runmode = RUN_DAEMON,
.ifname = DEFAULT_IF,
};

/*打印帮助数据*/
static void printhelp(void)
{
printf("-M 指定多播组\n");
printf("-P 指定接收端口\n");
printf("-F 前台运行\n");
printf("-D 指定媒体库路径\n");
printf("-I 指定网络设备\n");
printf("-H 显示帮助\n");
}


/*守护进程退出函数*/
static void daemon_exit(int s)
{
thr_list_destroy(); // 销毁节目单线程
thr_channel_destroyall(); // 销毁各个频道线程
mlib_freechnlist(list); // 销毁节目单结构体数据
if (s < 0) {
syslog(LOG_ERR, "Daemon failure exit.");
exit(1);
}
syslog(LOG_INFO, "Signal-%d caught, exit now.", s);

closelog(); // 关闭系统日志
exit(0); // 退出守护进程
}


/*创建守护进程,按照守护进程的流程去创建即可*/
static int daemonize(void)
{
pid_t pid;
int fd;

pid = fork();

if(pid < 0)
{
// 将报错写到日志中
syslog(LOG_ERR, "fork():%s", strerror(errno));
return -1;
}

// 父进程退出
else if(pid > 0){
printf("PPID Exit.\n");
printf("daemon PID : %d\n",pid);
exit(0);
}

// 子进程,创建新的会话,并且成为会话的leader
// 并且重定向或者关闭标准的输入输出流
else{
// 打开空设备,这个是一个特殊的设备
// 只要把数据扔到这个特殊的设备文件/dev/null中, 数据被被销毁了
fd = open("/dev/null",O_RDWR);
if(fd < 0)
{
// 将报错写到系统日志中
syslog(LOG_WARNING, "open():%s", strerror(errno));
return -2;
}
// 标准的输入输出流,错误流重定向到空设备中
dup2(fd, 0);
dup2(fd, 1);
dup2(fd, 2);

if(fd > 2)
close(2);

// 开启一个新的会话,使该子进程成为会话领头进程,脱离所有终端控制
setsid();
// 在系统日志中输出,告知已经创建好守护进程
syslog(LOG_INFO, "Daemon initialized OK");
// 改变当前进程的工作路经,工作路径需要为一个绝对有的路径,以避免守护进程持续占用任何挂载点
chdir("/");
// 改变文件模式掩码(umask),设置 umask 为 0 以确保守护进程可以读写其创建的任何文件,且不受继承的文件模式掩码的限制
umask(0);
return 0;
}
}


/*套接字初始化函数,设置套接字属性*/
static void socket_init(void)
{
// 创建UDP套接字,用于向多播组ip发送数据
serverSd = socket(AF_INET,SOCK_DGRAM,0);
if(serverSd < 0)
{
// 套接字创建失败,向系统日志发送信息
syslog(LOG_ERR, "socket():%s", strerror(errno));
exit(1);
}

// 设置套接字属性,建立多播组
// 指定用于发送多播数据包的网络接口(指定机器的那一个网卡设备用于发送多播数据)
struct ip_mreqn mreq;
inet_pton(AF_INET, server_conf.mgroup, &mreq.imr_multiaddr); // 多播组ip
inet_pton(AF_INET, "0.0.0.0", &mreq.imr_address); // 网络接口IP
mreq.imr_ifindex = if_nametoindex(server_conf.ifname); // 网卡设备索引
// 在IP层,设置属性名 IP_MULTICAST_IF 建立多播组
if (setsockopt(serverSd, IPPROTO_IP, IP_MULTICAST_IF, &mreq, sizeof(mreq)) < 0)
{
syslog(LOG_ERR, "setsockopt(IP_MULTICAST_IF):%s", strerror(errno));
exit(1);
}

// 定义远端ip信息--服务端需要将数据包发送至多播组,因此多播组ip就是远端ip
sndaddr.sin_family = AF_INET;
sndaddr.sin_port = htons(atoi(server_conf.rcvport));
inet_pton(AF_INET,server_conf.mgroup,&sndaddr.sin_addr);
// printf("%s %d\n",__FUNCTION__, __LINE__); 调试
}


int main(int argc,char *argv[])
{
int c;

struct sigaction sa;

sa.sa_handler = daemon_exit; // 进程捕获到这些信号的处理函数为守护进程退出函数
sigemptyset(&sa.sa_mask); // sa.sa_mask在信号处理函数执行时,额外需要阻塞的信号集合
sigaddset(&sa.sa_mask, SIGTERM);
sigaddset(&sa.sa_mask, SIGINT);
sigaddset(&sa.sa_mask, SIGQUIT);
// 上面三行将三个信号添加进入了,需要阻塞的信号集合中
// 在执行 daemon_exit 函数时,上述的三个信号将被自动阻塞
// 意味着如果在 daemon_exit 执行期间,程序再次收到这些信号,这些信号不会打断当前的信号处理函数,而是待处理函数执行完毕后再决定如何处理这些阻塞的信号
// 这防止了信号处理过程中的递归或重入问题,从而使程序的行为更为可预测和稳定
// 意味着当处理这些信号的 daemon_exit 函数正在执行时,这三个信号将不会再次被递送,从而阻止了信号处理函数的中途被相同信号中断

sigaction(SIGTERM, &sa, NULL);
sigaction(SIGINT, &sa, NULL);
sigaction(SIGQUIT, &sa, NULL);
// 为 SIGTERM、SIGINT 和 SIGQUIT 信号分别设置了新的处理动作,即用 sa 结构体中定义的处理方式。
// 即进程捕获到了SIGTERM、SIGINT 和 SIGQUIT 就会执行 daemon_exit 函数


// 打开系统日志
// 参数1:程序的名称,帮助在日志文件中区分来自不同应用程序的日志消息
// 参数2:参数是一个整数,用于指定日志操作的选项
// 参数3:指定日志消息的默认设施。设施参数用于指定日志消息的种类
openlog("IPV4_StreamingMedia", LOG_PID|LOG_PERROR, LOG_DAEMON);


/*1. 使用getopt(),进行命令行分析*/
while(1)
{
// extern char *optarg 是与getopt()有关的全局变量,会自动指向命令行当前带参数选项,后面的参数
c = getopt(argc,argv,"M:P:FD:I:H");
if(c < 0)
break;
switch (c) {
case 'M':
server_conf.mgroup = optarg;
break;
case 'P':
server_conf.rcvport = optarg;
break;
case 'F':
server_conf.runmode = RUN_FOREGROUND;
break;
case 'D':
server_conf.media_dir = optarg;
break;
case 'I':
server_conf.ifname = optarg;
break;
case 'H':
printhelp();
exit(0);
break;
default:
abort();
break;
}
}
/****************************************************************************************/


/*2. 守护进程的实现*/
// 命令行选项指定server作为一个守护进程运行
if(server_conf.runmode == RUN_DAEMON)
{
if(daemonize() != 0) // 将server端进程转化为守护进程
exit(1);
}
else if(server_conf.runmode == RUN_FOREGROUND)
{
// do nothing
}
else
{
// 向系统日志发送,级别为LOG_ERR
syslog(LOG_ERR, "EINVAL server_conf.runmode.");
exit(1);
}
/****************************************************************************************/


/*3. SOCKET初始化,设置套接字属性,创建多播组*/
socket_init();

/*4. 获取频道信息*/
struct mlib_listentry_st *list; // 媒体库中,频道信息结构体
int list_size; // 节目单上的频道数量(媒体库中有效的频道数量)
int err;
// if error
err = mlib_getchnlist(&list,&list_size); // 获取节目单信息
if(err)
{
// 媒体库中节目单信息获取失败
syslog(LOG_ERR,"mlib_getchnlist() failed in main.c.");
exit(1);
}

// 向系统日志发送调试信息,节目单中频道数量
syslog(LOG_DEBUG,"channel sizes = %d",list_size);

/*5. 创建频道线程*/
// 创建100个频道线程,一个频道对应一个线程,此处创建线程数量要考虑机器所能创建线程数量的上限
int i;
for(i = 0;i<list_size;i++)
{
err = thr_channel_create(list+i);
if(err)
{
syslog(LOG_ERR,"thr_channel_create() failed.");
exit(1);
}
}
// 向系统日志发送调试信息
syslog(LOG_DEBUG,"%d channel threads create.",i);


/*6. 创建节目单线程*/
err = thr_list_create(list,list_size);
if(err)
{
syslog(LOG_ERR,"thr_list_create() failed.");
exit(1);
}
// 向系统日志发送调试信息
syslog(LOG_DEBUG,"the channel_list threads create.");


while(1)
// 使用kill [daemon_pid] 发送信号,可以打断pause,并且去执行daemon_exit函数这样就退出了守护进程
pause(); // 将进程挂起

exit(0); // 执行不到
}

CMakeLists.txt

1
2
3
4
5
6
7
8
9
10
cmake_minimum_required(VERSION 3.16)
project(server C)

set(CMAKE_C_STANDARD 99)
include_directories(../include)

add_executable(server server.c mytbf.c thr_channel.c thr_list.c medialib.c)

# 直接链接pthread到你的项目
target_link_libraries(server PRIVATE pthread)


8.整体目录结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
(base) zxz@ubuntu:~/Proj/C_C++/linux_c/IPV4StreamingMedia$ tree
.
├── doc
│   ├── admin
│   ├── devel
│   └── user
├── INSTALL
├── LISENCE
├── Music
│   ├── ch1
│   │   ├── desc.txt
│   │   └── love1.mp3
│   ├── ch2
│   │   ├── desc.txt
│   │   └── love2.mp3
│   └── ch3
│   ├── desc.txt
│   └── love3.mp3
├── README
└── src
├── client
│   ├── build
│   ├── client.c
│   ├── client.h
│   └── CMakeLists.txt
├── include
│   ├── proto.h
│   └── site_type.h
└── server
├── build
├── CMakeLists.txt
├── medialib.c
├── medialib.h
├── mytbf.c
├── mytbf.h
├── server.c
├── server_conf.h
├── thr_channel.c
├── thr_channel.h
├── thr_list.c
└── thr_list.h
  • INSTALL文件:部署指导,使用这个应用程序,应该如何安装,会产生什么样的库
  • LISENCE文件:使用许可,当前遵循什么协议等
  • README:告知用户的一些须知

README

1
2
3
4
5
6
项目名称:IPV4流媒体项目
README
服务端:创建多播组,使用多线程向多播组ip发送所有的频道数据
客户端:加入多播组,从多播组ip接收,用户选定需要的数据
Music目录为媒体库目录
自述,当前完成什么样的功能等


9.编译运行

在src目录下的client目录与server目录下,分别运行指令,编译运行客户端与服务端程序

1
2
$ cmake..
$ make

运行效果如下:

1
./server -F

image-20240513231822232

  • 显示节目单数据与各个频道数据均发送成功

另开终端运行:

1
$ ./client

image-20240513231943375

  • 成功接收来自多播组ip的节目单数据,以及频道数据