TinywebServer代码详解– 日志系统-上(9)

该blog内容转自:最新版Web服务器项目详解 - 09 日志系统(上)

该blog对上述内容进行补充(在本人的角度)

结合此前记录的blog一起学习:牛客WebServer项目实战(点击跳转)

结合此前记录的blog一起学习:多线程与线程同步(点击跳转)

原项目地址(点击跳转)

博主添加注释后项目地址(点击跳转)



一、基础知识

1.相关概念

日志

  • 由服务器自动创建,并记录运行状态,错误信息,访问数据的文件

同步日志

  • 日志写入函数与工作线程串行执行,由于涉及到I/O操作,当单条日志比较大的时候,同步模式会阻塞整个处理流程,服务器所能处理的并发能力将有所下降,尤其是在峰值的时候,写日志可能成为系统的瓶颈

生产者-消费者模型

  • 并发编程中的经典模型。以多线程为例,为了实现线程间数据同步,生产者线程与消费者线程共享一个缓冲区,其中生产者线程往缓冲区中push消息,消费者线程从缓冲区中pop消息

阻塞队列

  • 将生产者-消费者模型进行封装,使用循环数组实现队列,作为两者共享的缓冲区

异步日志

  • 将所写的日志内容先存入阻塞队列,写线程从阻塞队列中取出内容,写入日志

单例模式

  • 最简单也是被问到最多的设计模式之一,保证一个类只创建一个实例,同时提供全局访问的方法


2.整体概述

本项目中,使用单例模式创建日志系统,对服务器运行状态、错误信息和访问数据进行记录,该系统可以实现按天分类,超行分类功能,可以根据实际情况分别使用同步和异步写入两种方式

其中异步写入方式,将生产者-消费者模型封装为阻塞队列创建一个写线程,工作线程将要写的内容push进队列,写线程从队列中取出内容,写入日志文件

日志系统大致可以分成两部分其一是单例模式与阻塞队列的定义,其二是日志类的定义与使用



3.本文内容

本篇将介绍单例模式与阻塞队列的定义,具体的涉及到单例模式、生产者-消费者模型,阻塞队列的代码实现

单例模式,描述懒汉与饿汉两种单例模式,并结合线程安全进行讨论

生产者-消费者模型,描述条件变量,基于该同步机制实现简单的生产者-消费者模型。

代码实现,结合代码对阻塞队列的设计进行详解




二、设计模式

1.单例模式

单例模式(Singleton Pattern)是一种创建型设计模式,旨在确保一个类只有一个实例,并提供一个全局访问点,该实例被所有程序模块共享

实现思路

私有化它的构造函数,以防止外界创建单例类的对象;

使用类的私有静态指针变量指向类的唯一实例,并用一个公有的静态方法获取该实例

单例模式有两种实现方法

懒汉式:类唯一的实例对象直到第一次获取的时候才产生(在第一次被使用时才进行初始化)

饿汉式:在还未获取类实例对象之前,类实例对象就已经产生了(在程序运行时立即初始化)


(1)饿汉式

饿汉式的单例模式,在还未获取类实例对象之前,类实例对象就已经产生了,需要注意如下几点:

限制构造函数的访问方式

  • 构造函数私有化

定义一个唯一的类实例对象

  • 私有化,通过static修饰

定义一个静态接口函数,获取唯一的类实例对象

  • 接口函数return唯一实例对象地址

复制构造函数/赋值运算符均禁止(显示禁止)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#include <iostream>
using namespace std;

class Singleton{
public:
// 共有的静态接口,获取唯一的实例对象
static Singleton *getInstance()
{
return instance;
}

private:
// 私有化的构造函数
Singleton(){}
~Singleton(){}
// 声明私有化静态的唯一类实例对象(指针)
static Singleton *instance;
};

// 类外初始化私有的类指针(不能把静态成员的初始化放置在类的定义中)
Singleton* Singleton::instance = new Singleton();

// 测试方法
int main()
{
// 通过静态接口获取类实例
Singleton *p1 = Singleton::getInstance();
Singleton *p2 = Singleton::getInstance();

cout << p1 << endl;
cout << p2 << endl;

return 0;
}

饿汉式是否线程安全

线程安全

在程序运行时就定义了对象,并对其初始化。之后,不管哪个线程调用成员函数getinstance(),都只不过是返回一个对象的指针而已。所以是线程安全的,不需要在获取实例的成员函数中加互斥锁锁


(2)饿汉式的潜在问题

在于非静态对象(函数外的static对象)在不同编译单元(多个不同的.cpp文件)中的初始化顺序是未定义的。如果在初始化完成之前调用 getInstance() 方法会返回一个未定义的实例

如果你在一个编译单元中定义了一个单例类,并在其构造函数中进行了某些初始化操作,而另一个编译单元的静态对象在其构造函数中尝试通过getInstance()访问这个单例类的实例,就可能出现问题。如果此时单例类的实例尚未初始化,那么getInstance()方法返回的实例将处于未定义的状态。这可能导致程序的行为异常或崩溃

C++标准没有明确规定静态对象在不同编译单元中的初始化顺序,主要是因为编译器和链接器的实现细节可能影响这个顺序。例如,一个编译单元中的静态对象可能在另一个编译单元中的静态对象之前或之后初始化,这取决于链接器如何处理这些对象的初始化指令

在单例模式中,如果使用饿汉式初始化(即在类定义时或静态初始化阶段创建单例对象),这个问题尤其关键。假设你有两个编译单元:Singleton.hMain.cpp。在 Singleton.h 中,你可能定义了一个静态的单例对象,而在 Main.cpp 或其他地方,你可能有代码调用了 getInstance() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Singleton.h
class Singleton {
public:
static Singleton& getInstance() {
return instance;
}

private:
static Singleton instance; // 静态单例对象,在编译期初始化
Singleton() {} // 私有构造函数
};

// Singleton.cpp
#include "Singleton.h"

Singleton Singleton::instance; // 静态对象的定义

在主函数或者其他编译单元中:

1
2
3
4
5
6
7
// Main.cpp
#include "Singleton.h"

int main() {
Singleton& s = Singleton::getInstance();
// 使用单例对象
}

潜在问题

如果 Singleton::instance 的初始化发生在 getInstance() 被调用之前,一切正常。但如果初始化顺序相反,即在 getInstance() 被调用时 Singleton::instance 还未初始化完全,就会得到一个未定义的行为。这可能导致程序崩溃或行为异常,因为此时返回的对象状态是不确定的

解决方案

延迟初始化:使用懒汉式初始化,即在第一次调用 getInstance() 时才创建单例对象。这样可以确保每次调用时对象都是正确初始化的

使用互斥锁:在多线程环境中,即使使用懒汉式初始化,也需要确保线程安全。这通常涉及到在 getInstance() 方法中添加互斥锁(mutex)来保护初始化过程

显式初始化:在程序的入口点(如 main() 函数)中显式初始化单例对象,确保所有其他代码在访问单例对象之前,该对象已经被正确初始化


(3)懒汉式

懒汉式单例模式,类的唯一实例对象直到第一次获取的时候才产生

Singleton.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class Singleton
{
public:
// 定义一个静态的接口函数,获取唯一的类实例对象
static Singleton *getInstance()
{
if(instance == nullptr)
{
instance = new Singleton();
}
return instance;
}

private:
// 构造函数的私有化
Singleton(){}
// 定义一个私有的唯一的类的实例对象指针
static Singleton *instance;
// 复制构造函数禁止
Singleton(const Singleton&) = delete;
// 赋值运算符禁止
Singleton& operator=(const Singleton&) = delete;
};

Singleton.cpp

1
Singleton *Singleton::instance = nullptr;

重点

  • 懒汉模式的单例模式,不是线程安全的,因为懒汉式的静态接口函数非可重入函数
  • 可重入函数:在多线程的条件下,可被多个线程调用并且不会产生竞态条件,可以被多个线程重复调用,而不会发生线程安全问题

(4)双重检查锁的懒汉式

经典的线程安全懒汉式单例模式,使用双检测锁模式,双重检查锁的优点

不仅保证了懒汉式的线程安全

而且,保证在单线程的环境下,每次调用getInstance函数不需要重复进行加锁,不会导致在单线程的环境下,频繁加锁,加锁的力度过大

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// Singleton.h
#include <pthread>
class Singleton
{
public:
// 定义一个静态的接口函数,获取唯一的类实例对象
static Singleton *getInstance()
{


if(instance == nullptr) // 第一次检查
{
// 加锁--双重检查
pthread_mutex_lock(&mtx);
if(instance == nullptr) // 第二次检查
{
instance = new Singleton(); // 确保实例仅创建一次
}
// 解锁
pthread_mutex_unlock(&mtx);
}

return instance;
}

private:
// 构造函数的私有化
Singleton(){}
// 定义一个私有的唯一的类的实例对象指针
static Singleton *instance;
// 复制构造函数禁止
Singleton(const Singleton&) = delete;
// 赋值运算符禁止
Singleton& operator=(const Singleton&) = delete;
// 互斥锁 (可以直接使用 static std::mutex mtx静态初始化的方式,源文件就可以省略初始化)
static pthread_mutex_t mtx;
};

// Singleton.cpp
Singleton *Singleton::instance = nullptr;
pthread_mutex_t Singleton::mtx = PTHREAD_MUTEX_INITIALIZER;

如果只检测一次,在每次调用获取实例的方法时,都需要加锁,这将严重影响程序性能。双层检测可以有效避免这种情况,仅在第一次创建单例的时候加锁,其他时候都不再符合NULL == p的情况,直接返回已创建好的实例


(5)局部静态变量的懒汉式

《Effective C++》(Item 04)中的提出另一种更优雅的单例模式实现,使用函数内的局部静态对象,这种方法不用加锁和解锁操作(C++0X之后可以,之前还是需要加锁

将唯一的实例对象,定义写到获取实例的静态接口中,当调用该静态接口就会对其进行初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Singleton.h
class Singleton
{
public:
// 定义一个静态的接口函数,获取唯一的类实例对象
static Singleton *getInstance()
{
// 函数静态局部变量的初始化,在汇编指令上已经自动添加线程互斥指令
// 因此如下的写法是线程安全的
static Singleton instance; // 定义以及初始化一个唯一的类实例对象
return &instance;
}

private:
// 构造函数的私有化
Singleton(){}
// 复制构造函数禁止
Singleton(const Singleton&) = delete;
// 赋值运算符禁止
Singleton& operator=(const Singleton&) = delete;
};

// Singleton.cpp
/**不需要做什么...**/

是否会造成线程安全问题

C++0X以后,要求编译器保证内部静态变量的线程安全性,故C++0x之后该实现是线程安全的,C++0x之前仍需加锁,其中C++0xC++11标准成为正式标准之前的草案临时名字



三、条件变量与生产者消费者模型

1.条件变量

严格意义上来说,条件变量的主要作用不是处理线程同步,而是进行线程的阻塞。如果在多线程程序中只使用条件变量无法实现线程的同步,必须要配合互斥锁来使用。条件变量提供了一种线程间的通知机制,当某个共享数据达到某个值时,唤醒等待这个共享数据的线程。条件变量与互斥锁的区别:

  • 假设有 A-Z 26 个线程,这 26 个线程共同访问同一把互斥锁,如果线程 A 加锁成功,那么其余 B-Z 线程访问互斥锁都阻塞,所有的线程只能顺序访问临界区
  • 条件变量只有在满足指定条件下才会阻塞线程,如果条件不满足,多个线程可以同时进入临界区,同时读写临界资源,这种情况下还是会出现共享资源中数据的混乱

一般情况下条件变量用于处理生产者和消费者模型,并且和互斥锁配合使用


(1)基础API
1
2
3
4
5
6
7
8
9
10
11
#include <pthread.h>

//定义一个条件变量类型的变量
pthread_cond_t cond;

// 初始化
int pthread_cond_init(pthread_cond_t *restrict cond,
const pthread_condattr_t *restrict attr);

// 销毁释放资源
int pthread_cond_destroy(pthread_cond_t *cond);

参数

  • cond: 条件变量的地址
  • attr: 条件变量属性,一般使用默认属性,指定为 NULL

1
2
3
4
// 唤醒阻塞在条件变量上的线程, 至少有一个被解除阻塞
int pthread_cond_signal(pthread_cond_t *cond);
// 唤醒阻塞在条件变量上的线程, 被阻塞的线程全部解除阻塞
int pthread_cond_broadcast(pthread_cond_t *cond);

调用上面两个函数中的任意一个,都可以换线被 pthread_cond_wait 或者pthread_cond_timedwait阻塞的线程,区别就在于 pthread_cond_signal 是唤醒至少一个被阻塞的线程(总个数不定),pthread_cond_broadcast 是唤醒所有被阻塞的线程


1
2
// 线程阻塞函数, 哪个线程调用这个函数, 哪个线程就会被阻塞
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);

通过函数原型可以看出,该函数在阻塞线程的时候,需要一个互斥锁参数,这个互斥锁主要功能是进行线程同步,让线程顺序进入临界区,避免出现数共享资源的数据混乱。该函数会对这个互斥锁做以下几件事情:

在阻塞线程时候,如果线程已经对互斥锁 mutex 上锁,那么会将这把锁打开,这样做是为了避免死锁

当线程解除阻塞的时候,函数内部会帮助这个线程再次将这个mutex互斥锁锁上,继续向下访问临界区

pthread_cond_wait用于等待目标条件变量。该函数调用时需要传入 mutex参数(加锁的互斥锁) ,函数执行时,先把调用线程放入条件变量的请求队列,然后将互斥锁mutex解锁,当函数成功返回为0时,表示重新抢到了互斥锁,互斥锁会再次被锁上, 也就是说函数内部会有一次解锁和加锁操作



2.条件变量使用陷阱

使用pthread_cond_wait方式如下:

1
2
3
4
5
6
7
8
9
pthread _mutex_lock(&mutex)

// 判断条件
while(线程执行的条件是否成立)
{
pthread_cond_wait(&cond, &mutex);
}

pthread_mutex_unlock(&mutex);

pthread_cond_wait执行后的内部操作分为以下几步

将线程放在条件变量的请求队列后,内部解锁

线程等待被pthread_cond_broadcast信号唤醒或者pthread_cond_signal信号唤醒,唤醒后去竞争锁

若竞争到互斥锁,内部再次加锁

为什么要把调用线程放入条件变量的请求队列后再解锁

线程是并发执行的,如果在把调用线程A放在等待队列之前,就释放了互斥锁,这就意味着其他线程比如线程B可以获得互斥锁去访问公有资源,这时候线程A所等待的条件改变了,但是它没有被放在等待队列上,导致A忽略了等待条件被满足的信号。

倘若在线程A调用pthread_cond_wait开始,到把A放在等待队列的过程中,都持有互斥锁,其他线程无法得到互斥锁,就不能改变公有资源

为什么判断线程执行的条件用while而不是if

一般来说,在多线程资源竞争的时候,在一个使用资源的线程里面(消费者)判断资源是否可用,不可用,便调用pthread_cond_wait,在另一个线程里面(生产者)如果判断资源可用的话,则调用pthread_cond_signal发送一个资源可用信号

wait成功之后,资源就一定可以被使用么?答案是否定的,如果同时有两个或者两个以上的线程正在等待此资源,wait返回后,资源可能已经被使用了

再具体点,有可能多个线程都在等待这个资源可用的信号,信号发出后只有一个资源可用,但是有A,B两个线程都在等待,B比较速度快,获得互斥锁,然后加锁,消耗资源,然后解锁,之后A获得互斥锁,但A回去发现资源已经被使用了,它便有两个选择,一个是去访问不存在的资源,另一个就是继续等待,那么继续等待下去的条件就是使用while,要不然使用if的话pthread_cond_wait返回后,就会顺序执行下去

如果只有一个消费者,那么使用if是可以的



3.生产者消费者模型

生产者和消费者模型的组成:

生产者线程->若干个

  • 生产商品或者任务放入到任务队列中
  • 任务队列满了就阻塞,不满的时候就工作
  • 通过一个生产者的条件变量控制生产者线程阻塞和非阻塞

消费者线程->若干个

  • 读任务队列,将任务或者数据取出
  • 任务队列中有数据就消费,没有数据就阻塞
  • 通过一个消费者的条件变量控制消费者线程阻塞和非阻塞

队列

  • 存储任务 / 数据,对应一块内存,为了读写访问可以通过一个数据结构维护这块内存
  • 可以是数组、链表,也可以使用 stl 容器:queue /stack/list/vector

image-20231219205133201

生产者与消费者模型代码(点击跳转)




四、阻塞队列代码分析

阻塞队列类中封装了生产者-消费者模型,其中push成员是生产者,pop成员是消费者

阻塞队列中,使用了循环数组实现了队列,作为两者共享缓冲区,当然了,队列也可以使用STL中的queue

1.自定义队列

当队列为空时,从队列中获取元素的线程将会被挂起;当队列是满时,往队列里添加元素的线程将会挂起

使用循环数组模拟循环队列,先入先出,队尾入,队首出,需要注意如下几点:

入队

  • 队尾指针向后移动m_back = (m_back + 1) % m_max_size
  • m_max_size为队列容量

出队

  • 队首指针先后移动m_front = (m_front + 1) % m_max_size

log/block_queue.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
/*************************************************************
*循环数组实现的阻塞队列,m_back = (m_back + 1) % m_max_size;
*线程安全,每个操作前都要先加互斥锁,操作完后,再解锁
**************************************************************/
#pragma once
#include <iostream>
#include <stdlib.h>
#include <pthread.h>
#include <sys/time.h>
#include "../lock/locker.h"
using namespace std;

// 模板类
template <class T>
class block_queue
{
public:
// 构造函数初始化私有成员
block_queue(int max_size = 1000);
~block_queue();
void clear();
bool full();
bool empty();
bool front(T &value);
bool back(T &value);
int size();
int max_size();
bool push(const T &item);
bool pop(T &item);
bool pop(T &item, int ms_timeout);

private:
// 互斥锁,locker类实例 m_mutex,该锁在locker的构造函数中进行了初始化
locker m_mutex;
// 造函数中进行了初始化
cond m_cond;
T *m_array;
// 队列中当前元素数量
int m_size;
// 队列容量
int m_max_size;
// 队列首部索引
int m_front;
// 队列尾部索引
int m_back;
};


/*
* func:构造函数初始化私有成员
*/
template <class T>
block_queue<T>::block_queue(int max_size)
{
if (max_size <= 0)
{
exit(-1);
}
m_max_size = max_size;
// 循环数组实现阻塞队列
m_array = new T[max_size];
m_size = 0;
m_front = -1;
m_back = -1;
}


/*
* func:析构函数,资源回收
*/
template <class T>
block_queue<T>::~block_queue()
{
m_mutex.lock();
if (m_array != NULL)
delete [] m_array;

m_mutex.unlock();
}


/*
* func:清空私有的成员变量
*/
template<class T>
void block_queue<T>::clear()
{
m_mutex.lock();
m_size = 0;
m_front = -1;
m_back = -1;
m_mutex.unlock();
}


/*
* func:判断队列是否满
*/
template<class T>
void block_queue<T>::full()
{
m_mutex.lock();
if (m_size >= m_max_size)
{

m_mutex.unlock();
return true;
}
m_mutex.unlock();
return false;
}


/*
* func:判断队列是否为空
*/
template<class T>
void block_queue<T>::empty()
{
m_mutex.lock();
if (0 == m_size)
{
m_mutex.unlock();
return true;
}
m_mutex.unlock();
return false;
}


/*
* func:返回队首元素
*/
template<class T>
void block_queue<T>::front(T &value)
{
m_mutex.lock();
if (0 == m_size)
{
m_mutex.unlock();
return false;
}
value = m_array[m_front];
m_mutex.unlock();
return true;
}


/*
* func:返回队尾元素
*/
template<class T>
void block_queue<T>::back(T &value)
{
m_mutex.lock();
if (0 == m_size)
{
m_mutex.unlock();
return false;
}
value = m_array[m_back];
m_mutex.unlock();
return true;
}


/*
* func:获取队列元素数量
*/
template<class T>
void block_queue<T>::size()
{
int tmp = 0;

m_mutex.lock();
tmp = m_size;

m_mutex.unlock();
return tmp;
}


/*
* func:获取队列容量
*/
template<class T>
void block_queue<T>::max_size()
{
int tmp = 0;

m_mutex.lock();
tmp = m_max_size;

m_mutex.unlock();
return tmp;
}


/*
* func:向队列中添加元素
* note:往队列添加元素后,需要将所有使用队列的消费者线程唤醒
*/
template<class T>
void block_queue<T>::push(const T &item)
{
m_mutex.lock();
// 队列是满的,无法继续添加
if (m_size >= m_max_size)
{
// 唤醒阻塞在队列上的消费者
m_cond.broadcast();
m_mutex.unlock();
return false;
}
// 队尾索引向后移动
m_back = (m_back + 1) % m_max_size;
// 入队
m_array[m_back] = item;
m_size++;

// 唤醒阻塞在队列上的消费者
m_cond.broadcast();
m_mutex.unlock();
return true;
}


/*
* func:从队列中取出元素
* note:pop时,如果当前队列为可空,将会等待条件变量,阻塞所有等待在队列上的消费这线程
*/
template<class T>
void block_queue<T>::pop(T &item)
{
m_mutex.lock();
// 队列为空
while (m_size <= 0)
{
// 消费者阻塞在队列上,等待生产者唤醒
if (!m_cond.wait(m_mutex.get()))
{
m_mutex.unlock();
return false;
}
}

// 队首索引,向后移动
m_front = (m_front + 1) % m_max_size;
// 出队
item = m_array[m_front];
m_size--;
m_mutex.unlock();
return true;
}

出队pop增加超时处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/*
* func:从队列中取出元素
* note:增加了超时处理,若超时时间内没有抢到,则return false
*/
template<class T>
void block_queue<T>::pop(T &item, int ms_timeout)
{
struct timespec t = {0, 0};
struct timeval now = {0, 0};
// 获取当前的时间 秒和微秒
gettimeofday(&now, NULL);
m_mutex.lock();
if (m_size <= 0)
{
// 队列为空,计算超时时间
// ms_timeout/1000毫秒超时转换为秒
t.tv_sec = now.tv_sec + ms_timeout / 1000;
t.tv_nsec = (ms_timeout % 1000) * 1000;
if (!m_cond.timewait(m_mutex.get(), t))
{
m_mutex.unlock();
return false;
}
}

if (m_size <= 0)
{
m_mutex.unlock();
return false;
}

m_front = (m_front + 1) % m_max_size;
item = m_array[m_front];
m_size--;
m_mutex.unlock();
return true;
}