做一篇文章记录实现,驱动优化迭代。 Github前端
好久以前就想写一个日志库了,受限制于本身水平这个想法一直没有完成。在上次写一个 TCPServer 的时候,写了一个错误的日志库,在多线程的状况对速度影响很是大,而且最重要的正确性也没有保证。linux
近期又有了点时间,决定从新实现,对其 特色指望:git
日志格式和日志文件格式指望以下:github
test_log_file.20191224.160354.7.log
20191224 16:04:05.125049 2970 ERROR 5266030 chello, world - LogTest.cpp:thr_1():20
以前看的两个日志库,刚好都叫作 NanoLog:windows
前一个日志库学到了用户接口宏的定义方式,后一个 nanolog 则是吸取了其后端的设计部分,尤为是 thread_local 的使用,但我以为它的精华在于前端部分,充斥着大量的模板。嗯,没太看懂。后端
先后端分离实现数组
采用单例模式,全局只有一个 LimLog
对象,在这个对象构造时也建立一个后台线程,经过无限循环扫描是否有数据能够写入至文件中。经过条件变量控制该循环的终止及 LimLog
对象的销毁。多线程
使用 C++ 11 的关键字 thread_local 为每一个线程都建立一个线程局部缓冲区,这样咱们的前端的日志写入就能够不用加锁(虽然不用加锁,可是有另外的处理,后说明),每次都写入到各自线程的缓冲区中。而后在上面的循环中扫描这些缓冲区,将其中的日志数据读取至内部的输出缓冲区中,再将其写入到文件中。前后端分离
每一个缓冲区内存只建立一次使用,避免每次使用建立带来没必要要的申请消耗。每一个线程局部缓冲区的大小为 1M(1 << 20 bytes), LimLog
对象内的输出缓冲区大小为 16M(1 << 24 bytes), 这个大小很是充足,实际测试 大概每一个线程的内存使用为 30k(1M) 和 60-150k(16M) 左右,这个地方须要IO压测一下。异步
后端日志类的成员变量以下,
class LimLog{ ··· private: LogSink logSink_; bool threadSync_; // 先后端同步的标示 bool threadExit_; // 后台线程标示 bool outputFull_; // 输出缓冲区满的标示 LogLevel level_; uint32_t bufferSize_; // 输出缓冲区的大小 uint32_t sinkCount_; // 写入文件的次数 uint32_t perConsumeBytes_; // 每轮循环读取的字节数 uint64_t totalConsumeBytes_; // 总读取的字节数 char *outputBuffer_; // first internal buffer. char *doubleBuffer_; // second internal buffer, unused. std::vector<BlockingBuffer *> threadBuffers_; std::thread sinkThread_; std::mutex bufferMutex_; // internel buffer mutex. std::mutex condMutex_; std::condition_variable proceedCond_; // 后台线程是否继续运行的标条件变量 std::condition_variable hitEmptyCond_; // 前端缓冲区为空的条件变量 static LimLog singletonLog; static thread_local BlockingBuffer *blockingBuffer_; // 线程局部缓冲区 };
对 LimLog
对象的析构和后台线程的退出须要十分的谨慎,这里程序出问题的重灾区,前一个日志库就是这个地方没有处理好,致使有的时候日志尚未写完程序就退出了。为了保证这个逻辑的正确运行,采用了两个条件变量 proceedCond_
和 hitEmptyCond_
。在执行对象的析构函数时,须要等待无可读数据的条件变量 hitEmptyCond_
, 保证这个时候线程局部缓冲区的数据都已经读取完成,而后在析构函数中设置后台线程循环退出的条件,而且使用 proceedCond_
通知后台线程运行。
以上逻辑在析构函数中的逻辑以下:
LimLog::~LimLog() { { // notify background thread befor the object detoryed. std::unique_lock<std::mutex> lock(singletonLog.condMutex_); singletonLog.threadSync_ = true; singletonLog.proceedCond_.notify_all(); singletonLog.hitEmptyCond_.wait(lock); } { // stop sink thread. std::lock_guard<std::mutex> lock(condMutex_); singletonLog.threadExit_ = true; singletonLog.proceedCond_.notify_all(); } ··· }
以后就转入到后台线程的循环处理中,如下是全部后台线程处理的逻辑:
// Sink log info to file with async. void LimLog::sinkThreadFunc() { // \fixed me, it will enter infinity if compile with -O3 . while (!threadExit_) { // move front-end data to internal buffer. { std::lock_guard<std::mutex> lock(bufferMutex_); uint32_t bbufferIdx = 0; // while (!threadExit_ && !outputFull_ && !threadBuffers_.empty()) { while (!threadExit_ && !outputFull_ && (bbufferIdx < threadBuffers_.size())) { BlockingBuffer *bbuffer = threadBuffers_[bbufferIdx]; uint32_t consumableBytes = bbuffer->consumable(); // 若是这个地方使用 used() 代替,就会出现日志行截断的现象 if (bufferSize_ - perConsumeBytes_ < consumableBytes) { outputFull_ = true; break; } if (consumableBytes > 0) { uint32_t n = bbuffer->consume( outputBuffer_ + perConsumeBytes_, consumableBytes); perConsumeBytes_ += n; } else { // threadBuffers_.erase(threadBuffers_.begin() + // bbufferIdx); } bbufferIdx++; } } // not data to sink, go to sleep, 50us. if (perConsumeBytes_ == 0) { std::unique_lock<std::mutex> lock(condMutex_); // if front-end generated sync operation, consume again. if (threadSync_) { threadSync_ = false; continue; } hitEmptyCond_.notify_one(); proceedCond_.wait_for(lock, std::chrono::microseconds(50)); } else { logSink_.sink(outputBuffer_, perConsumeBytes_); sinkCount_++; totalConsumeBytes_ += perConsumeBytes_; perConsumeBytes_ = 0; outputFull_ = false; } } }
当扫描发现无数据可取时,这个时候咱们再循环读取一下,防止析构函数在扫描数据和判断 threadSync_
执行的时间差以内又有日志产生,在又一轮循环后,确保全部的数据都已经读取,通知析构函数继续执行,而后退出循环,该后台线程函数退出,线程销毁。 wait_for 是节约系统资源,出现无数据时当前线程就直接休眠 50us 避免没必要要的循环消耗.
以上是对后台线程的处理,对数据的处理其实仍是比较简单的,后端提供了一个写入数据的接口,该接口将前端产生的数据统一写入至线程局部缓冲区中。
void LimLog::produce(const char *data, size_t n) { if (!blockingBuffer_) { std::lock_guard<std::mutex> lock(bufferMutex_); blockingBuffer_ = static_cast<BlockingBuffer *>(malloc(sizeof(BlockingBuffer))); threadBuffers_.push_back(blockingBuffer_); // 添加到后端的缓冲区数组中 } blockingBuffer_->produce(data, n); // 将数据添加到线程局部缓冲区中 }
后台写入线程这个时候,遍历缓冲数组,读取数据至内部的输出缓冲区中,当出现可读取的数据大于缓冲区时,直接写入文件,剩余的数据下一路再读取;当无可读取的数据时,判断是对象析构了还只是简单的没有数据读取,而且休眠 50us;有数据可读则写入至文件,重置相关数据。
线程局部缓冲区为一个阻塞环形生产者消费者队列,设计为阻塞态的缘由为缓冲区的大小足够大了,若是改成非阻塞态,当缓冲区的满了的时候从新分配内存,还要管理一次内存的释放操做。虽然线程内的操做不须要使用锁来同步,在后台线程的循环范围内,每次都要获取数据的可读大小,这里可能就涉及多个线程对数据的访问了。最初的版本测试在没有同步而且开启优化的状况下,有必定几率在 consume()
操做陷入死循环,缘由是该线程的 cache 的变量未及时获得更新。如今使用内存屏障来解决这个问题,避免编译器优化。
对于后台线程读取各线程内的数据有一个随机性的问题,若是使用 BlockingBuffer::unsed()
获取已读数据长度,可能会出现日志行只被读取了一半,而后马上被另外的一个线程的日志截断的状况,这不是咱们指望的。为了解决这个问题,咱们又提供了一个接口 BlockingBuffer::incConsumablePos()
来移动能够读取的位置,该接口每次递增的长度为一条日志行的长度,在一日志行数据写入到局部缓冲区后调用该接口来移动能够读取到的位置。而又提供接口 BlockingBuffer::consumable()
来获取能够读取的长度,这样就避免了一条日志行被截断的状况。
一条日志信息在内存中的布局以下: +------+-----------+-------+------+------+----------+------+ | time | thread id | level | logs | file | function | line | +------+-----------+-------+------+------+----------+------+ 20191224 16:04:05.125044 2970 ERROR 5266025chello, world - LogTest.cpp:thr_1():20 20191224 16:04:05.125044 2970 ERROR 5266026chello, world - LogTest.cpp:thr_1():20 20191224 16:04:05.125045 2970 ERROR 5266027chello, world - LogTest.cpp:thr_1():20
前端实现虽然简单,可是优化的空间很大,使用 perf 作性能分析,性能热点集中在参数格式化为字符串和时间的处理上面。
咱们用 LogLine
来表示一个日志行,这个类很是简单,重载了多个参数类型的操做符 <<
, 及保存一些日志行相关信息,包括 文件名称,函数,行和最重要的写入字节数。
LogLine
不包含缓冲区,直接调用后端提供的接口 LimLog::produce()
将数据写入到局部缓冲区内。
在 limlog 中,屡次使用到了 thread_local 关键字,在前端实现部分也是如此。
在 Linux 中使用 gettimeofday(2)
来获取当前的时间戳,其余平台使用 std::chrono
来获取时间戳,用 localtime()
和 strftime()
获取本地的格式化时间 %Y%m%d %H:%M:%S, 合并微秒 timestamp % 1000000
组成 time.
这里利用 thread_local 作一个优化,格式化时间的函数调用次数。定义一个线程局部变量来存储时间戳对应的秒数 t_second
和 字符数组来存储格式化时间 t_datetime[24]
,当秒数未发生改变时,直接取线程局部字符数组而不用再调用格式化函数。
在 x86-64 体系下,gettimeofday(2)
在 vdso 机制下已经不是系统调用了,经测试发现直接调用 gettimeofday(2)
比 std::chrono
的高精度时钟快 15% 左右,虽然不是系统调用可是耗时仍是大头,一个调用大概须要 600ns 左右的时间。
在 Linux 下,使用 gettid()
而不是 pthread_self()
来获取线程id,windows 暂时未支持,这个 std::this_thread::get_id()
取出里面的整形id须要hack一下。
这里继续使用 thread_local 来对线程id优化,每一个线程的id调用一次线程id获取函数。建立一个线程局部线程id变量,判断线程id存在时,就再也不调用 gettid() 了。
日志等级、文件、函数和行数是简单的字符串写入操做了。
#define LOG(level) \ if (limlog::getLogLevel() <= level) \ LogLine(level, __FILE__, __FUNCTION__, __LINE__) #define LOG_TRACE LOG(limlog::LogLevel::TRACE) #define LOG_DEBUG LOG(limlog::LogLevel::DEBUG) #define LOG_INFO LOG(limlog::LogLevel::INFO) #define LOG_WARN LOG(limlog::LogLevel::WARN) #define LOG_ERROR LOG(limlog::LogLevel::ERROR) #define LOG_FATAL LOG(limlog::LogLevel::FATAL)
如上所示,定义了一些宏给调用者使用,每一条日志都会建立一个 LogLine
对象,这个对象很是小,用完就销毁,几乎没有开销。用法同 std::cout
std::string str("std::string"); LOG_DEBUG << 'c' << 1 << 1.3 << "c string" << str;
LogLine::operator<<()
参数转字符串作优化