transport_layer网络传输层模块源码实现四
关于做者
前滴滴出行技术专家,现任OPPO文档数据库mongodb负责人,负责oppo千万级峰值TPS/十万亿级数据量文档数据库mongodb内核研发及运维工做,一直专一于分布式缓存、高性能服务端、数据库、中间件等相关研发。后续持续分享《MongoDB内核源码设计、性能优化、最佳运维实践》,Github帐号地址:https://github.com/y123456yzgit
《mongodb内核源码实现、性能调优、最佳运维实践系列》文章有先后逻辑关系,请阅读本篇文章前,提早阅读以下模块:github
mongodb网络传输层模块源码实现二mongodb
- 说明
本文分析网络传输层模块中的最后一个子模块:service_executor服务运行子模块,即线程模型子模块。在阅读该文章前,请提早阅读下<<Mongodb网络传输处理源码实现及性能调优-体验内核性能极致设计>>、<<transport_layer网络传输层模块源码实现二>>、<<transport_layer网络传输层模块源码实现三>>,这样有助于快速理解本文分享的线程模型子模块。缓存
线程模型设计在数据库性能指标中起着很是重要的做用,所以本文将重点分析mongodb服务层线程模型设计,体验mongodb如何经过优秀的工做线程模型来达到多种业务场景下的性能极致表现。该模块主要代码实现文件以下:性能优化
service_executor线程模型子模块,在代码实现中,把线程模型分为两种:synchronous线程模式和adaptive线程模型,这两种线程模型中用于任务调度运行的线程统称为worker工做线程。Mongodb启动的时候经过配置参数net.serviceExecutor来肯定采用那种线程模式运行mongo实例,配置方式以下:网络
1.//synchronous同步线程模式配置,一个连接已给线程 2.net: 3. serviceExecutor: synchronous 4. 5.//动态线程池模式配置 6.net: 7. serviceExecutor: adaptive
2. synchronous同步线程模型(一个连接已给线程)设计原理及核心代码实现session
Synchronous同步线程模型也就是每接收到一个连接,就建立一个线程专门负责该连接对应全部的客户端请求,也就是该连接的全部访问至始至终由同一个线程负责处理。多线程
2.1 核心代码实现原理
该线程模型核心代码实现由ServiceExecutorSynchronous类负责,该类注意成员变量和重要接口以下:
1.//同步线程模型对应ServiceExecutorSynchronous类 2.class ServiceExecutorSynchronous final : public ServiceExecutor { 3.public: 4. //ServiceExecutorSynchronous初始化 5. explicit ServiceExecutorSynchronous(ServiceContext* ctx); 6. //获取系统CPU个数 7. Status start() override; 8. //shutdown处理 9. Status shutdown(Milliseconds timeout) override; 10. //线程管理及任务入队处理 11. Status schedule(Task task, ScheduleFlags flags) override; 12. //同步线程模型对应mode 13. Mode transportMode() const override { 14. return Mode::kSynchronous; 15. } 16. //获取该模型统计信息 17. void appendStats(BSONObjBuilder* bob) const override; 18. 19.private: 20. //私有线程队列 21. static thread_local std::deque<Task> _localWorkQueue; 22. //递归深度 23. static thread_local int _localRecursionDepth; 24. //空闲线程数,例如某个连接当前没有请求,则该线程阻塞在读操做上面等待数据读到来 25. static thread_local int64_t _localThreadIdleCounter; 26. //shutdown的时候设置为false,连接没关闭前一直为true 27. AtomicBool _stillRunning{false}; 28. //当前conn线程数,参考ServiceExecutorSynchronous::schedul 29. AtomicWord<size_t> _numRunningWorkerThreads{0}; 30. //cpu个数 31. size_t _numHardwareCores{0}; 32.};
ServiceExecutorSynchronous类核心成员变量及其功能说明以下:
每一个连接对应的线程都有三个私有成员,分别是:线程队列、递归深度、idle频度,这三个线程私有成员的做用以下:
- _localWorkQueue:线程私有队列,task任务入队及出队执行都是经过该队列完成
- _localRecursionDepth:任务递归深度控制,避免堆栈溢出
- _localThreadIdleCounter:当线程运行多少次任务后,须要短暂的休息一下子,默认运行0xf次task任务就调用markThreadIdle()一次
同步线程模型子模块最核心的代码实现以下:
1.//ServiceStateMachine::_scheduleNextWithGuard 启动新的conn线程 2.Status ServiceExecutorSynchronous::schedule(Task task, ScheduleFlags flags) { 3. //若是_stillRunning为false,则直接返回 4. if (!_stillRunning.load()) { 5. return Status{ErrorCodes::ShutdownInProgress, "Executor is not running"}; 6. } 7. //队列不为空,说明由任务须要运行,同步线程模型只有新链接第一次经过SSM进入该函数的时候为空 8. //其余状况都不为空 9. if (!_localWorkQueue.empty()) { 10. //kMayYieldBeforeSchedule标记当返回客户端应答成功后,开始接收下一个新请求,这时候会设置该标记 11. if (flags & ScheduleFlags::kMayYieldBeforeSchedule) { 12. //也就是若是该连接对应的线程若是连续处理了0xf个请求,则须要休息一下子 13. if ((_localThreadIdleCounter++ & 0xf) == 0) { 14. //短暂休息会儿后再处理该连接的下一个用户请求 15. //其实是调用TCMalloc MarkThreadTemporarilyIdle实现 16. markThreadIdle(); 17. } 18. //连接数即线程数超过了CPU个数,则每处理完一个请求,就yield一次 19. if (_numRunningWorkerThreads.loadRelaxed() > _numHardwareCores) { 20. stdx::this_thread::yield();//线程本次不参与CPU调度,也就是放慢脚步 21. } 22. } 23. //带kMayRecurse标识,说明即将调度执行的是dealTask 24. //若是递归深度小于synchronousServiceExecutorRecursionLimit,则执行task 25. if ((flags & ScheduleFlags::kMayRecurse) && 26. (_localRecursionDepth < synchronousServiceExecutorRecursionLimit.loadRelaxed())) { 27. ++_localRecursionDepth; 28. //递归深度没有超限,则直接执行task,不用入队 29. task(); 30. } else { 31. //入队,等待 32. _localWorkQueue.emplace_back(std::move(task)); 33. } 34. return Status::OK(); 35. } 36. //建立conn线程,线程名conn-xx(其实是从listener线程继承过来的,这时候的Listener线程是父线程,在 37. //ServiceStateMachine::start中已经过线程守护ThreadGuard改成conn-xx),执行对应的task 38. Status status = launchServiceWorkerThread([ this, task = std::move(task) ] { 39. //说明来了一个新连接,线程数自增 40. int ret = _numRunningWorkerThreads.addAndFetch(1); 41. //新连接到来的第一个任务其实是readTask任务 42. _localWorkQueue.emplace_back(std::move(task)); 43. while (!_localWorkQueue.empty() && _stillRunning.loadRelaxed()) { 44. //每次任务若是是经过线程私有队列获取运行,则恢复递归深度为初始值1 45. _localRecursionDepth = 1; 46. //取出该线程拥有的私有队列上的第一个任务运行 47. _localWorkQueue.front()(); 48. //该任务已经执行完毕,把该任务从队列移除 49. _localWorkQueue.pop_front(); 50. } 51. //走到这里说明线程异常了或者须要退出,如连接关闭,须要消耗线程 52. ...... 53. }); 54. return status; 55.}
从上面的代码能够看出,worker工做线程经过_localRecursionDepth控制task任务的递归深度,当递归深度超过最大深度synchronousServiceExecutorRecursionLimit值,则把任务到_localWorkQueue队列,而后从队列获取task任务执行。
此外,为了达到性能的极致发挥,在每次执行task任务的时候作了以下细节设计,这些细节设计在高压力状况下,能够提高5%的性能提高:
- 每运行oxf次任务,就经过markThreadIdle()让线程idle休息一下子
- 若是线程数大于CPU核数,则每执行一个任务前都让线程yield()一次
2.2该模块函数接口总结大全
synchronous同步线程模型全部接口及其功能说明以下表所示:
- Adaptive动态线程模型设计原理及核心代码实现
adaptive动态线程模型,会根据当前系统的访问负载动态的调整线程数,当线程CPU工做比较频繁的时候,控制线程增长工做线程数;当线程CPU比较空闲后,本线程就会自动销毁退出,整体worker工做线程数就会减小。
3.1 动态线程模型核心源码实现
动态线程模型核心代码实现由ServiceExecutorAdaptive负责完成,该类核心成员变量及核心函数接口以下:
1.class ServiceExecutorAdaptive : public ServiceExecutor { 2.public: 3. //初始化构造 4. explicit ServiceExecutorAdaptive(...); 5. explicit ServiceExecutorAdaptive(...); 6. ServiceExecutorAdaptive(...) = default; 7. ServiceExecutorAdaptive& operator=(ServiceExecutorAdaptive&&) = default; 8. virtual ~ServiceExecutorAdaptive(); 9. //控制线程及worker线程初始化建立 10. Status start() final; 11. //shutdown处理 12. Status shutdown(Milliseconds timeout) final; 13. //任务调度运行 14. Status schedule(Task task, ScheduleFlags flags) final; 15. //adaptive动态线程模型对应Mode 16. Mode transportMode() const final { 17. return Mode::kAsynchronous; 18. } 19. //统计信息 20. void appendStats(BSONObjBuilder* bob) const final; 21. //获取runing状态 22. int threadsRunning() { 23. return _threadsRunning.load(); 24. } 25. //新键一个worker线程 26. void _startWorkerThread(); 27. //worker工做线程主循环while{}处理 28. void _workerThreadRoutine(int threadId, ThreadList::iterator it); 29. //control控制线程主循环,主要用于控制何时增长线程 30. void _controllerThreadRoutine(); 31. //判断队列中的任务数和可用线程数大小,避免任务task饥饿 32. bool _isStarved() const; 33. //asio网络库io上下文 34. std::shared_ptr<asio::io_context> _ioContext; //早期ASIO中叫io_service 35. //TransportLayerManager::createWithConfig赋值调用 36. std::unique_ptr<Options> _config; 37. //线程列表及其对应的锁 38. mutable stdx::mutex _threadsMutex; 39. ThreadList _threads; 40. //控制线程 41. stdx::thread _controllerThread; 42. 43. //TransportLayerManager::createWithConfig赋值调用 44. //时间嘀嗒处理 45. TickSource* const _tickSource; 46. //运行状态 47. AtomicWord<bool> _isRunning{false}; 48. //kThreadsRunning表明已经执行过task的线程总数,也就是这些线程不是刚刚建立起来的 49. AtomicWord<int> _threadsRunning{0}; 50. //表明当前刚建立或者正在启动的线程总数,也就是建立起来尚未执行task的线程数 51. AtomicWord<int> _threadsPending{0}; 52. //当前正在执行task的线程 53. AtomicWord<int> _threadsInUse{0}; 54. //当前入队还没执行的task数 55. AtomicWord<int> _tasksQueued{0}; 56. //当前入队还没执行的deferredTask数 57. AtomicWord<int> _deferredTasksQueued{0}; 58. //TransportLayerManager::createWithConfig赋值调用 59. //没什么实际做用 60. TickTimer _lastScheduleTimer; 61. //记录这个退出的线程生命期内执行任务的总时间 62. AtomicWord<TickSource::Tick> _pastThreadsSpentExecuting{0}; 63. //记录这个退出的线程生命期内运行的总时间(包括等待IO及运行IO任务的时间) 64. AtomicWord<TickSource::Tick> _pastThreadsSpentRunning{0}; 65. //完成线程级的统计 66. static thread_local ThreadState* _localThreadState; 67. 68. //总的入队任务数 69. AtomicWord<int64_t> _totalQueued{0}; 70. //总执行的任务数 71. AtomicWord<int64_t> _totalExecuted{0}; 72. //从任务被调度入队,到真正被执行这段过程的时间,也就是等待被调度的时间 73. AtomicWord<TickSource::Tick> _totalSpentQueued{0}; 74. 75. //shutdown的时候等待线程消耗的条件变量 76. stdx::condition_variable _deathCondition; 77. //条件变量,若是发现工做线程压力大,为了不task饥饿 78. //通知controler线程,通知见ServiceExecutorAdaptive::schedule,等待见_controllerThreadRoutine 79. stdx::condition_variable _scheduleCondition; 80.};
ServiceExecutorAdaptive类核心成员变量及其功能说明以下:
从上面的成员变量列表看出,队列、线程这两个大类能够进一步细化为不一样的小类,以下:
- 线程:_threadsRunning、threadsPending、_threadsInUsed
- 队列:_totalExecuted、_tasksQueued、deferredTasksQueued
从上面的ServiceExecutorAdaptive类中的核心接口函数代码实现能够概括为以下三类:
- 时间计数相关核心代码实现
- Worker工做线程建立及任务调度相关核心接口代码实现
- controler控制线程设计原理及核心代码实现
3.1.1 线程运行时间计算相关核心代码实现
线程运行时间计算核心算法以下:
1.//线程运行时间统计,包含两种类型时间统计 2.enum class ThreadTimer 3.{ 4. //线程执行task任务的时间+等待数据的时间 5. Running, 6. //只包含线程执行task任务的时间 7. Executing 8.}; 9. 10.//线程私有统计信息,记录该线程运行时间,运行时间分为两种: 11.//1. 执行task任务的时间 2. 若是没有客户端请求,线程就会等待,这就是线程等待时间 12.struct ThreadState { 13. //构造初始化 14. ThreadState(TickSource* ts) : running(ts), executing(ts) {} 15. //线程一次循环处理的时间,包括IO等待和执行对应网络事件对应task的时间 16. CumulativeTickTimer running; 17. //线程一次循环处理中执行task任务的时间,也就是真正工做的时间 18. CumulativeTickTimer executing; 19. //递归深度 20. int recursionDepth = 0; 21.}; 22. 23.//获取指定which类型的工做线程相关运行时间, 24.//例如Running表明线程总运行时间(等待数据+任务处理) 25.//Executing只包含执行task任务的时间 26.TickSource::Tick ServiceExecutorAdaptive::_getThreadTimerTotal(ThreadTimer which) const { 27. //获取一个时间嘀嗒tick 28. TickSource::Tick accumulator; 29. //先把已消耗的线程的数据统计出来 30. switch (which) { 31. //获取生命周期已经结束的线程执行任务的总时间(只包括执行任务的时间) 32. case ThreadTimer::Running: 33. accumulator = _pastThreadsSpentRunning.load(); 34. break; 35. //获取生命周期已经结束的线程整个生命周期时间(包括空闲时间+执行任务时间) 36. case ThreadTimer::Executing: 37. accumulator = _pastThreadsSpentExecuting.load(); 38. break; 39. } 40. //而后再把统计当前正在运行的worker线程的不一样类型的统计时间统计出来 41. stdx::lock_guard<stdx::mutex> lk(_threadsMutex); 42. for (auto& thread : _threads) { 43. switch (which) { 44. //获取当前线程池中全部工做线程执行任务时间 45. case ThreadTimer::Running: 46. accumulator += thread.running.totalTime(); 47. break; 48. //获取当前线程池中全部工做线程整个生命周期时间(包括空闲时间+执行任务时间) 49. case ThreadTimer::Executing: 50. accumulator += thread.executing.totalTime(); 51. break; 52. } 53. } 54. //返回的时间计算包含了已销毁的线程和当前正在运行的线程的相关统计 55. return accumulator; 56.}
Worker工做线程启动后的时间能够包含两类:1. 线程运行task任务的时间;2.线程等待客户端请求的时间。一个线程建立起来,若是没有客户端请求,则线程就会等待接收数据。若是有客户端请求,线程就会经过队列获取task任务运行。这两类时间分别表明线程”忙”和“空闲”。
线程总的“忙”状态时间=全部线程运行task任务的时间,包括已经销毁的线程。线程总的“空闲”时间=全部线程等待获取任务执行的时间,也包括已销毁的线程,线程空闲通常是没有客户端请求,或者客户端请求不多。Worker工做线程对应while(){}循环每循环一次都会进行线程私有运行时间ThreadState计数,总的时间统计就是以该线程私有统计信息为基准求和而来。
3.1.2 worker工做线程建立、销毁及task任务处理
worker工做线程在以下状况下建立或者销毁:1. 线程池初始化;2. controler控制线程发现当前线程池中线程比较”忙”,则会动态建立新的工做线程;3. 工做线程在while体中每循环一次都会判断当前线程池是否很”闲”,若是很”闲”则本线程直接销毁退出。
Worker工做线程建立核心源码实现以下:
1.Status ServiceExecutorAdaptive::start() { 2. invariant(!_isRunning.load()); 3. //running状态 4. _isRunning.store(true); 5. //控制线程初始化建立,线程回调ServiceExecutorAdaptive::_controllerThreadRoutine 6. _controllerThread = stdx::thread(&ServiceExecutorAdaptive::_controllerThreadRoutine, this); 7. //启动时候默认启用CPU核心数/2个worker线程 8. for (auto i = 0; i < _config->reservedThreads(); i++) { 9. //建立一个工做线程 10. _startWorkerThread(); 11. } 12. return Status::OK(); }
worker工做线程默认初始化为CPU/2个,初始工做线程数也能够经过指定的命令行参数来配置:adaptiveServiceExecutorReservedThreads。此外,start()接口默认也会建立一个controler控制线程。
Task任务经过SSM状态机调用ServiceExecutorAdaptive::schedule()接口入队,该函数接口核心代码实现以下:
1.Status ServiceExecutorAdaptive::schedule(ServiceExecutorAdaptive::Task task, ScheduleFlags flags) { 2. //获取当前时间 3. auto scheduleTime = _tickSource->getTicks(); 4. //kTasksQueued: 普通tak,也就是dealTask 5. //_deferredTasksQueued: deferred task,也就是readTask 6. //defered task和普通task分开记录 _totalQueued=_deferredTasksQueued+_tasksQueued 7. auto pendingCounterPtr = (flags & kDeferredTask) ? &_deferredTasksQueued : &_tasksQueued; 8. //相应队列 9. pendingCounterPtr->addAndFetch(1); 10. ...... 11. //这里面的task()执行后-task()执行前的时间才是CPU真正工做的时间 12. auto wrappedTask = [ this, task = std::move(task), scheduleTime, pendingCounterPtr ] { 13. //worker线程回调会执行该wrappedTask, 14. pendingCounterPtr->subtractAndFetch(1); 15. auto start = _tickSource->getTicks(); 16. //从任务被调度入队,到真正被执行这段过程的时间,也就是等待被调度的时间 17. //从任务被调度入队,到真正被执行这段过程的时间 18. _totalSpentQueued.addAndFetch(start - scheduleTime); 19. //recursionDepth=0说明开始进入调度处理,后续有多是递归执行 20. if (_localThreadState->recursionDepth++ == 0) { 21. //记录wrappedTask被worker线程调度执行的起始时间 22. _localThreadState->executing.markRunning(); 23. //当前正在执行wrappedTask的线程加1 24. _threadsInUse.addAndFetch(1); 25. } 26. //ServiceExecutorAdaptive::_workerThreadRoutine执行wrappedTask后会调用guard这里的func 27. const auto guard = MakeGuard([this, start] { //改函数在task()运行后执行 28. //每执行一个任务完成,则递归深度自减 29. if (--_localThreadState->recursionDepth == 0) { 30. //wrappedTask任务被执行消耗的总时间 31. //_localThreadState->executing.markStopped()表明任务该task执行的时间 32. _localThreadState->executingCurRun += _localThreadState->executing.markStopped(); 33. //下面的task()执行完后,正在执行task的线程-1 34. _threadsInUse.subtractAndFetch(1); 35. } 36. //总执行的任务数,task每执行一次增长一次 37. _totalExecuted.addAndFetch(1); 38. }); 39. //运行任务 40. task(); 41. }; 42. //kMayRecurse标识的任务,会进行递归调用 dealTask进入调度的时候调由该标识 43. if ((flags & kMayRecurse) && //递归调用,任务仍是由本线程处理 44. //递归深度还没达到上限,则仍是由本线程继续调度执行wrappedTask任务 45. (_localThreadState->recursionDepth + 1 < _config->recursionLimit())) { 46. //本线程立马直接执行wrappedTask任务,不用入队到boost-asio全局队列等待调度执行 47. //io_context::dispatch io_context::dispatch 48. _ioContext->dispatch(std::move(wrappedTask)); 49. } else { //入队 io_context::post 50. //task入队到schedule得全局队列,等待工做线程调度 51. _ioContext->post(std::move(wrappedTask)); 52. } 53. // 54. _lastScheduleTimer.reset(); 55. //总的入队任务数 56. _totalQueued.addAndFetch(1); 57. //kDeferredTask真正生效在这里 58. //若是队列中的任务数大于可用线程数,说明worker压力过大,须要建立新的worker线程 59. if (_isStarved() && !(flags & kDeferredTask)) {//kDeferredTask真正生效在这里 60. //条件变量,通知controler线程,通知_controllerThreadRoutine控制线程处理 61. _scheduleCondition.notify_one(); 62. } 63. return Status::OK(); 64.}
从上面的分析能够看出,schedule()主要完成task任务入队处理。若是带有递归标识kMayRecurse,则经过_ioContext->dispatch()接口入队,该接口再ASIO底层实现的时候实际上没有真正把任务添加到全局队列,而是直接当前线程继续处理,这样就实现了递归调用。若是没有携带kMayRecurse递归标识,则task任务经过_ioContext->post()须要入队到全局队列。ASIO库的dispatch接口和post接口的具体实现能够参考:
<<Mongodb网络传输处理源码实现及性能调优-体验内核性能极致设计>>
若是任务入队到全局队列,则线程池中的worker线程就会经过全局锁竞争从队列中获取task任务执行,该流程经过以下接口实现:
1.//建立线程的回掉函数,线程循环主体,从队列获取task任务执行 2.void ServiceExecutorAdaptive::_workerThreadRoutine( 3. int threadId, ServiceExecutorAdaptive::ThreadList::iterator state) { 4. //设置线程模 5. _localThreadState = &(*state); 6. { 7. //worker-N线程名 8. std::string threadName = str::stream() << "worker-" << threadId; 9. setThreadName(threadName); 10. } 11. //该线程第一次执行while中的任务的时候为ture,后面都是false 12. //表示该线程是刚建立起来的,尚未执行任何一个task 13. bool stillPending = true; 14. 15. //线程退出的时候执行如下{},都是一些计数清理 16. const auto guard = MakeGuard([this, &stillPending, state] { 17. //该worker线程退出前的计数清理、信号通知处理 18. //...... 19. } 20. while (_isRunning.load()) { 21. ...... 22. //本次循环执行task的时间,不包括网络IO等待时间 23. state->executingCurRun = 0; 24. try { 25. //经过_ioContext和入队的任务联系起来 26. asio::io_context::work work(*_ioContext); 27. //记录开始时间,也就是任务执行开始时间 28. state->running.markRunning(); 29. //执行ServiceExecutorAdaptive::schedule中对应的task 30. //线程第一次运行task任务,最多从队列拿一个任务执行 31. //runTime.toSystemDuration()指定一次run最多运行多长时间 32. if (stillPending) { 33. //执行一个任务就会返回 34. _ioContext->run_one_for(runTime.toSystemDuration()); 35. } else { // Otherwise, just run for the full run period 36. //_ioContext对应的全部任务都执行完或者toSystemDuration超时后才会返回 37. _ioContext->run_for(runTime.toSystemDuration()); //io_context::run_for 38. } 39. ...... 40. } 41. //该线程第一次执行while中的任务后设置ture,后面都是false 42. if (stillPending) { 43. _threadsPending.subtractAndFetch(1); 44. stillPending = false; 45. //当前线程数比初始线程数多 46. } else if (_threadsRunning.load() > _config->reservedThreads()) { 47. //表明本次循环该线程真正处理任务的时间与本次循环总时间(总时间包括IO等待和IO任务处理时间) 48. double executingToRunning = state->executingCurRun / static_cast<double>(spentRunning); 49. executingToRunning *= 100; 50. dassert(executingToRunning <= 100); 51. 52. int pctExecuting = static_cast<int>(executingToRunning); 53. //线程不少,超过了指定配置,而且知足这个条件,该worker线程会退出,线程比较空闲,退出 54. //若是线程真正处理任务执行时间占比小于该值,则说明本线程比较空闲,能够退出。 55. if (pctExecuting < _config->idlePctThreshold()) { 56. log() << "Thread was only executing tasks " << pctExecuting << "% over the last " 57. << runTime << ". Exiting thread."; 58. break; //退出线程循环,也就是线程自动销毁了 59. } 60. } 61. } 62.}
线程主循环主要工做内容:1. 从ASIO库的全局队列获取任务执行;2. 判断本线程是否比较”闲”,若是是则直接销毁退出。3. 线程建立起来进行初始线程名设置、线程主循环一些计数处理等。
3.2.3 controller控制线程核心代码实现
控制线程用于判断线程池是线程是否压力很大,是否比较”忙”,若是是则增长线程数来减轻全局队列中task任务积压引发的延迟处理问题。控制线程核心代码实现以下:
1.//controller控制线程 2.void ServiceExecutorAdaptive::_controllerThreadRoutine() { 3. //控制线程线程名设置 4. setThreadName("worker-controller"_sd); 5. ...... 6. //控制线程主循环 7. while (_isRunning.load()) { 8. //一次while结束的时候执行对应func ,也就是结束的时候计算为起始时间 9. const auto timerResetGuard = 10. MakeGuard([&sinceLastControlRound] { sinceLastControlRound.reset(); }); 11. //等待工做线程通知,最多等待stuckThreadTimeout 12. _scheduleCondition.wait_for(fakeLk, _config->stuckThreadTimeout().toSystemDuration()); 13. ...... 14. double utilizationPct; 15. { 16. //获取全部线程执行任务的总时间 17. auto spentExecuting = _getThreadTimerTotal(ThreadTimer::Executing); 18. //获取全部线程整个生命周期时间(包括空闲时间+执行任务时间+建立线程的时间) 19. auto spentRunning = _getThreadTimerTotal(ThreadTimer::Running); 20. //也就是while中执行一次这个过程当中spentExecuting差值, 21. //也就是spentExecuting表明while一次循环的Executing time开始值, 22. //lastSpentExecuting表明一次循环对应的结束time值 23. auto diffExecuting = spentExecuting - lastSpentExecuting; 24. //也就是spentRunning表明while一次循环的running time开始值, 25. //lastSpentRunning表明一次循环对应的结束time值 26. auto diffRunning = spentRunning - lastSpentRunning; 27. if (spentRunning == 0 || diffRunning == 0) 28. utilizationPct = 0.0; 29. else { 30. lastSpentExecuting = spentExecuting; 31. lastSpentRunning = spentRunning; 32. 33. //一次while循环过程当中全部线程执行任务的时间和线程运行总时间的比值 34. utilizationPct = diffExecuting / static_cast<double>(diffRunning); 35. utilizationPct *= 100; 36. } 37. } 38. //也就是本while()执行一次的时间差值,也就是上次走到这里的时间和本次走到这里的时间差值大于该阀值 39. //也就是控制线程过久没有判断线程池是否够用了 40. if (sinceLastControlRound.sinceStart() >= _config->stuckThreadTimeout()) { 41. //use中的线程数=线程池中总的线程数,说明线程池中线程太忙了 42. if ((_threadsInUse.load() == _threadsRunning.load()) && 43. (sinceLastSchedule >= _config->stuckThreadTimeout())) { 44. log() << "Detected blocked worker threads, " 45. << "starting new reserve threads to unblock service executor"; 46. //一次批量建立这么多线程,若是咱们配置adaptiveServiceExecutorReservedThreads很是大 47. //这里实际上有个问题,则这里会一次性建立很是多的线程,可能反而会成为系统瓶颈 48. //建议mongodb官方这里最好作一下上限限制 49. for (int i = 0; i < _config->reservedThreads(); i++) { 50. //建立新的worker工做线程 51. _startWorkerThread(); 52. } 53. } 54. continue; 55. } 56. //当前的worker线程数 57. auto threadsRunning = _threadsRunning.load(); 58. //保证线程池中worker线程数最少都要reservedThreads个 59. if (threadsRunning < _config->reservedThreads()) { 60. //线程池中线程数最少数量不能比最低配置少 61. while (_threadsRunning.load() < _config->reservedThreads()) { 62. _startWorkerThread(); 63. } 64. } 65. //worker线程非空闲占比小于该阀值,说明压力不大,不须要增长worker线程数 66. if (utilizationPct < _config->idlePctThreshold()) { 67. continue; 68. } 69. //走到这里,说明总体worker工做压力仍是很大的 70. //咱们在这里循环stuckThreadTimeout毫秒,直到咱们等待worker线程建立起来并正常运行task 71. //由于若是有正在建立的worker线程,咱们等待一小会,最多等待stuckThreadTimeout ms 72. //保证一次while循环时间为stuckThreadTimeout 73. do { 74. stdx::this_thread::sleep_for(_config->maxQueueLatency().toSystemDuration()); 75. } while ((_threadsPending.load() > 0) && 76. (sinceLastControlRound.sinceStart() < _config->stuckThreadTimeout())); 77. //队列中任务数多余可用空闲线程数,说明压力有点大,给线程池增长一个新的线程 78. if (_isStarved()) { 79. _startWorkerThread(); 80. } 81. } 82.}
Mongodb服务层有个专门的控制线程用于判断线程池中工做线程的压力状况,以此来决定是否在线程池中建立新的工做线程来提高性能。
控制线程每过必定时间循环检查线程池中的线程压力状态,实现原理就是简单的实时记录线程池中的线程当前运行状况,为如下两类计数:总线程数_threadsRunning、
当前正在运行task任务的线程数_threadsInUse。若是_threadsRunning=_threadsRunning,说明全部工做线程当前都在处理task任务,这时候就会建立新的worker线程来减轻任务由于排队引发的延迟。
2.1.4 adaptive线程模型函数接口大全
前面只分析了核心的几个接口,下表列出了该模块的完整接口功能说明:
- 总结
adaptive动态线程池模型,内核实现的时候会根据当前系统的访问负载动态的调整线程数。当线程CPU工做比较频繁的时候,控制线程增长工做线程数;当线程CPU比较空闲后,本线程就会自动消耗退出。下面一块儿体验adaptive线程模式下,mongodb是如何作到性能极致设计的。
3.1 synchronous同步线程模型总结
Sync线程模型也就是一个连接一个线程,实现比较简单。该线程模型,listener线程每接收到一个连接就会建立一个线程,该连接上的全部数据读写及内部请求处理流程将一直由本线程负责,整个线程的生命周期就是这个连接的生命周期。
3.2 adaptive线程模型worker线程运行时间相关的几个统计
3.6状态机调度模块中提到,一个完整的客户端请求处理能够转换为2个任务:经过asio库接收一个完整mongodb报文、接收到报文后的后续全部处理(含报文解析、认证、引擎层处理、发送数据给客户端等)。假设这两个任务对应的任务名、运行时间分别以下表所示:
客户端一次完整请求过程当中,mongodb内部处理过程=task1 + task2,整个请求过程当中mongodb内部消耗的时间T1+T2。
实际上若是fd上没有数据请求,则工做线程就会等待数据,等待数据的过程就至关于空闲时间,咱们把这个时间定义为T3。因而一个工做线程总运行时间=内部任务处理时间+空闲等待时间,也就是线程总时间=T1+T2+T3,只是T3是无用等待时间。
- 单个工做线程如何判断本身处于”空闲”状态
步骤2中提到,线程运行总时间=T1 + T2 +T3,其中T3是无用等待时间。若是T3的无用等待时间占比很大,则说明线程比较空闲。
Mongodb工做线程每次运行完一次task任务后,都会判断本线程的有效运行时间占比,有效运行时间占比=(T1+T2)/(T1+T2+T3),若是有效运行时间占比小于某个阀值,则该线程自动退出销毁,该阀值由adaptiveServiceExecutorIdlePctThreshold参数指定。该参数在线调整方式:
db.adminCommand( { setParameter: 1, adaptiveServiceExecutorIdlePctThreshold: 50} )
- 如何判断线程池中工做线程“太忙”
Mongodb服务层有个专门的控制线程用于判断线程池中工做线程的压力状况,以此来决定是否在线程池中建立新的工做线程来提高性能。
控制线程每过必定时间循环检查线程池中的线程压力状态,实现原理就是简单的实时记录线程池中的线程当前运行状况,为如下两类计数:总线程数_threadsRunning、
当前正在运行task任务的线程数_threadsInUse。若是_threadsRunning=_threadsRunning,说明全部工做线程当前都在处理task任务,这时候已经没有多余线程去asio库中的全局任务队列op_queue_中取任务执行了,这时候队列中的任务就不会获得及时的执行,就会成为响应客户端请求的瓶颈点。
- 如何判断线程池中全部线程比较“空闲”
control控制线程会在收集线程池中全部工做线程的有效运行时间占比,若是占比小于指定配置的阀值,则表明整个线程池空闲。
前面已经说明一个线程的有效时间占比为:(T1+T2)/(T1+T2+T3),那么全部线程池中的线程总的有效时间占比计算方式以下:
全部线程的总有效时间TT1 = (线程池中工做线程1的有效时间T1+T2) + (线程池中工做线程2的有效时间T1+T2) + ..... + (线程池中工做线程n的有效时间T1+T2)
全部线程总运行时间TT2 = (线程池中工做线程1的有效时间T1+T2+T3) + (线程池中工做线程2的有效时间T1+T2+T3) + ..... + (线程池中工做线程n的有效时间T1+T2+T3)
线程池中全部线程的总有效工做时间占比 = TT1/TT2
- control控制线程如何动态增长线程池中线程数
Mongodb在启动初始化的时候,会建立一个线程名为”worker-controller”的控制线程,该线程主要工做就是判断线程池中是否有充足的工做线程来处理asio库中全局队列op_queue_中的task任务,若是发现线程池比较忙,没有足够的线程来处理队列中的任务,则在线程池中动态增长线程来避免task任务在队列上排队等待。
control控制线程循环主体主要压力判断控制流程以下:
while { #等待工做线程唤醒条件变量,最长等待stuckThreadTimeout _scheduleCondition.wait_for(stuckThreadTimeout) ...... #获取线程池中全部线程最近一次运行任务的总有效时间TT1 Executing = _getThreadTimerTotal(ThreadTimer::Executing); #获取线程池中全部线程最近一次运行任务的总运行时间TT2 Running = _getThreadTimerTotal(ThreadTimer::Running); #线程池中全部线程的总有效工做时间占比 = TT1/TT2 utilizationPct = Executing / Running; ...... #表明control线程过久没有进行线程池压力检查了 if(本次循环到该行代码的时间 > stuckThreadTimeout阀值) { #说明过久没作压力检查,形成工做线程不够用了 if(_threadsInUse == _threadsRunning) { #批量建立一批工做线程 for(; i < reservedThreads; i++) #建立工做线程 _startWorkerThread(); } #control线程继续下一次循环压力检查 continue; } ...... #若是当前线程池中总线程数小于最小线程数配置 #则建立一批线程,保证最少工做线程数达到要求 if (threadsRunning < reservedThreads) { while (_threadsRunning < reservedThreads) { _startWorkerThread(); } } ...... #检查上一次循环到本次循环这段时间范围内线程池中线程的工做压力 #若是压力不大,则说明无需增长工做线程数,则继续下一次循环 if (utilizationPct < idlePctThreshold) { continue; } ...... #若是发现已经有线程建立起来了,可是这些线程尚未运行任务 #这说明当前可用线程数可能足够了,咱们休息sleep_for会儿在判断一下 #该循环最多持续stuckThreadTimeout时间 do { stdx::this_thread::sleep_for(); } while ((_threadsPending.load() > 0) && (sinceLastControlRound.sinceStart() < stuckThreadTimeout) #若是tasksQueued队列中的任务数大于工做线程数,说明任务在排队了 #该扩容线程池中线程了 if (_isStarved()) { _startWorkerThread(); } }
1. 实时serviceExecutorTaskStats线程模型统计信息
本文分析的mongodb版本为3.6.1,其network.serviceExecutorTaskStats网络线程模型相关统计经过db.serverStatus().network.serviceExecutorTaskStats能够查看,以下图所示:
上表中各个字段的都有各自的意义,咱们须要注意这些参数的如下状况:
- threadsRunning - threadsInUse的差值越大说明线程池中线程比较空闲,差值越小说明压力越大
- threadsPending越大,表示线程池越空闲
- tasksQueued - totalExecuted的差值越大说明任务队列上等待执行的任务越多,说明任务积压现象越明显
- deferredTasksQueued越大说明工做线程比较空闲,在等待客户端数据到来
- totalTimeRunningMicros - totalTimeExecutingMicros差值越大说明越空闲
上面三个大类中的整体反映趋势都是同样的,任何一个差值越大就说明越空闲。
在后续mongodb最新版本中,去掉了部分重复统计的字段,同时也增长了如下字段,以下图所示:
新版本增长的几个统计项实际上和3.6.1大同小异,只是把状态机任务按照不通类型进行了更加详细的统计。新版本中,更重要的一个功能就是control线程在发现线程池压力过大的时候建立新线程的触发状况也进行了统计,这样咱们就能够更加直观的查看动态建立的线程是由于什么缘由建立的。
- Mongodb-3.6早期版本control线程动态调整动态增长线程缺陷1例
从步骤6中能够看出,control控制线程建立工做线程的第一个条件为:若是该线程超过stuckThreadTimeout阀值都没有作线程压力控制检查,而且线程池中线程数所有在处理任务队列中的任务,这种状况control线程一次性会建立reservedThreads个线程。reservedThreads由adaptiveServiceExecutorReservedThreads配置,若是没有配置,则采用初始值CPU/2。
那么问题来了,若是我提早经过命令行配置了这个值,而且这个值配置的很是大,例如一百万,这里岂不是要建立一百万个线程,这样会形成操做系统负载升高,更容易引发耗尽系统pid信息,这会引发严重的系统级问题。
不过,不用担忧,最新版本的mongodb代码,内核代码已经作了限制,这种状况下建立的线程数变为了1,也就是这种状况只建立一个线程。
3.3 adaptive线程模型实时参数调优
动态线程模设计的时候,mongodb设计者考虑到了不通应用场景的状况,所以在核心关键点增长了实时在线参数调整设置,主要包含以下7种参数,以下表所示:
命令行实时参数调整方法以下,以adaptiveServiceExecutorReservedThreads为例,其余参数调整方法相似:db.adminCommand( { setParameter: 1, adaptiveServiceExecutorReservedThreads: xx} )
Mongodb服务层的adaptive动态线程模型设计代码实现很是优秀,有不少实现细节针对不一样应用场景作了极致优化。
3.4 不一样线程模型性能多场景PK
详见:<<Mongodb网络传输处理源码实现及性能调优-体验内核性能极致设计>>
3.5 Asio网络库全局队列锁优化,性能进一步提高
经过<<Mongodb网络传输处理源码实现及性能调优-体验内核性能极致设计>>一文中的ASIO库实现和adaptive动态线程模型实现,能够看出为了获取全局任务队列上的任务,须要进行全局锁竞争,这其实是整个线程池从队列获取任务运行最大的一个瓶颈。
优化思路:咱们能够经过优化队列和锁来提高总体性能,当前的队列只有一个,咱们能够把单个队列调整为多个队列,每一个队列一把锁,任务入队的时候经过把连接session散列到多个队列,经过该优化,锁竞争及排队将会获得极大的改善。
优化前队列架构:
优化后队列架构:
如上图,把一个全局队列拆分为多个队列,任务入队的时候把session按照hash散列到各自的队列,工做线程获取任务的时候,同理经过hash的方式去对应的队列获取任务,经过这种方式减小锁竞争,同时提高总体性能。
因为篇幅缘由,本文只分析了主要核心接口源码实现,更多接口的源码实现能够参考以下地址,详见:mongodb adaptive动态线程模型源码详细分析