mongodb内核源码实现、性能调优、最佳运维实践系列-mongodb网络传输层模块源码实现三

关于做者

       前滴滴出行技术专家,现任OPPO文档数据库mongodb负责人,负责oppo千万级峰值TPS/十万亿级数据量文档数据库mongodb研发和运维工做,一直专一于分布式缓存、高性能服务端、数据库、中间件等相关研发。Github帐号地址:https://github.com/y123456yzgit

1. 说明

         在以前的<<Mongodb网络传输处理源码实现及性能调优-体验内核性能极致设计>>和<<transport_layer网络传输层模块源码实现二>>一文中分析了如何阅读百万级大工程源码、Asio网络库实现、线程模型、transport_layer套接字处理及传输层管理子模块、session会话子模块、Ticket数据收发子模块、service_entry_point服务入口点子模块。github

       本文将继续分析网络传输层模块中service_state_machine状态机调度子模块内核源码实现。mongodb

2. service_state_machine状态机调度子模块

       service_state_machine状态机处理模块主要复杂一次完整请求的状态转换,确保请求能够按照指定的流程顺利进行,最终实现客户端的正常mongodb访问。该模块核心代码实现主要由如下三个源码文件实现(test为测试相关,能够忽略):数据库

2.1 核心代码实现

       在service_entry_point服务入口点子模块分析中,当接收到一个新的连接后,在ServiceEntryPointImpl::startSession(...)回调函数中会构造一个ServiceStateMachine  ssm类,从而实现了新连接、session、ssm的一一映射关系。其中,ServiceStateMachine 类实现对ThreadGuard(线程守护)有较多的依赖,所以本文从这两个类核心代码实现来分析整个状态机调度模块的内部设计细节。缓存

2.1.1 ThreadGuard线程守护类核心代码实现

       ThreadGuard也就是线程守护类,该类主要用于工做线程名的管理维护、ssm归属管理、该ssm对应session连接的回收处理等。该类核心成员及接口实现以下:网络

1.class ServiceStateMachine::ThreadGuard {  
2.    ......  
3.public:  
4.    // create a ThreadGuard which will take ownership of the SSM in this thread.  
5.    //ThreadGuard初始化,标记ssm全部权属于本线程  
6.    explicit ThreadGuard(ServiceStateMachine* ssm) : _ssm{ssm} {  
7.        //获取ssm状态机全部权  
8.        auto owned = _ssm->_owned.compareAndSwap(Ownership::kUnowned, Ownership::kOwned);  
9.        //若是是一个连接一个线程模型,则条件知足  
10.        if (owned == Ownership::kStatic) {   
11.        //一个连接一个线程模型,因为连接始终由同一个线程处理,所以该连接对应的ssm始终归属于同一个线程  
12.            dassert(haveClient());  
13.            dassert(Client::getCurrent() == _ssm->_dbClientPtr);  
14.            //标识归属权已肯定  
15.            _haveTakenOwnership = true;  
16.            return;  
17.        }  
18.         ......
19.        //adaptive 动态线程模式走下面的模式  
20.  
21.        //把当前线程的线程名零时保存起来  
22.        auto oldThreadName = getThreadName();   
23.        //ssm保存的线程名和当前线程名不一样  
24.        if (oldThreadName != _ssm->_threadName) {  
25.            //即将修改线程名了,把修改前的线程名保存到_oldThreadName  
26.            _ssm->_oldThreadName = getThreadName().toString();  
27.           //把运行本ssm状态机的线程名改成conn-x线程  
28.            setThreadName(_ssm->_threadName); //把当前线程更名为_threadName  
29.        }  
30.  
31.        //设置本线程对应client信息,一个连接对应一个client,标识本client当前归属于本线程处理  
32.        Client::setCurrent(std::move(_ssm->_dbClient));  
33.         //本状态机ssm全部权有了,归属于运行本ssm的线程  
34.        _haveTakenOwnership = true;  
35.    }  
36.    ......  
37.    //从新赋值  
38.    ThreadGuard& operator=(ThreadGuard&& other) {  
39.        if (this != &other) {  
40.            _ssm = other._ssm;  
41.            _haveTakenOwnership = other._haveTakenOwnership;  
42.            //原来的other全部权失效  
43.            other._haveTakenOwnership = false;  
44.        }  
45.    //返回  
46.        return *this;  
47.    };  
48.  
49.    //析构函数  
50.    ~ThreadGuard() {  
51.    //ssm全部权已肯定,则析构的时候,调用release处理,恢复线程原有线程名  
52.        if (_haveTakenOwnership)  
53.            release();  
54.    }  
55.  
56.    //一个连接一个线程模型,标记_owned为kStatic,也就是线程名永远不会改变  
57.    void markStaticOwnership() {  
58.        dassert(static_cast<bool>(*this));  
59.        _ssm->_owned.store(Ownership::kStatic);  
60.    }  
61.  
62.    //恢复原有线程名,同时把client信息从调度线程归还给状态机  
63.    void release() {  
64.         auto owned = _ssm->_owned.load();  
65.         //adaptive异步线程池模式知足if条件,表示SSM固定归属于某个线程  
66.        if (owned != Ownership::kStatic) {  
67.        //本线程拥有currentClient信息,因而把它归还给SSM状态机  
68.            if (haveClient()) {  
69.                _ssm->_dbClient = Client::releaseCurrent();  
70.            }  
71.             //恢复到之前的线程名  
72.            if (!_ssm->_oldThreadName.empty()) {  
73.                //恢复到老线程名  
74.                setThreadName(_ssm->_oldThreadName);   
75.            }  
76.        }  
77.        //状态机状态进入end,则调用对应回收hook处理  
78.        if (_ssm->state() == State::Ended) {  
79.            //连接关闭的回收处理 ServiceStateMachine::setCleanupHook  
80.            auto cleanupHook = std::move(_ssm->_cleanupHook);  
81.            if (cleanupHook)  
82.                cleanupHook();  
83.            return;  
84.        }  
85.  
86.        //归属权失效  
87.        _haveTakenOwnership = false;  
88.        //归属状态变为未知  
89.        if (owned == Ownership::kOwned) {  
90.            _ssm->_owned.store(Ownership::kUnowned);  
91.        }  
92.    }  
93.  
94.private:  
95.    //本线程守护当前对应的ssm  
96.    ServiceStateMachine* _ssm;  
97.    //默认false,标识该状态机ssm不归属于任何线程  
98.    bool _haveTakenOwnership = false;  
99.}  

       从上面的代码分析能够看出线程守护类做用比较明确,就是守护当前线程的归属状态,并记录状态机ssm不一样状态变化先后的线程名。此外,状态机ssm对应的session连接若是进入end状态,则该连接的资源回收释放也由该类完成。session

       查看mongod或者mongos实例,若是启动实例的时候配置了serviceExecutor: adaptive会发现这些进程下面有不少线程名为conn-xworker-x线程,同时同一个线程线程名可能发生改变,这个过程就是由ThreadGuard线程守护类来实现。synchronous一个连接一个线程模型只有conn-x线程,adaptive线程模型将同时存在有线程名为conn-xworker-x的线程,具体原理讲在后面继续分析,以下图:多线程

说明:synchronous线程模式对应worker初始线程名为conn-x,adaptive线程模型对应worker初始线程名为worker-x运维

      ThreadGuard线程守护类和状态机ssm(service_state_machine)关联紧密,客户端请求处理的内部ssm状态转换也和该类密切关联,请看后续分析。异步

2.1.2 ServiceStateMachine 类核心代码实现

       service_state_machine状态机处理模块核心代码实现经过ServiceStateMachine类完成,该类的核心结构成员和函数接口以下:

1.//一个新连接对应一个ServiceStateMachine保存到ServiceEntryPointImpl._sessions中  
2.class ServiceStateMachine : public std::enable_shared_from_this<ServiceStateMachine> {  
3.   ......  
4.public:  
5.   ......  
6.   static std::shared_ptr<ServiceStateMachine> create(...);  
7.   ServiceStateMachine(...);  
8.   //状态机所属状态
9.   enum class State {  
10.       //ServiceStateMachine::ServiceStateMachine构造函数初始状态  
11.       Created,        
12.       //ServiceStateMachine::_runNextInGuard开始进入接收网络数据状态  
13.       //标记本次客户端请求已完成(已经发送DB获取的数据给客户端),等待调度进行该连接的  
14.       //下一轮请求,该状态对应处理流程为_sourceMessage  
15.       Source,         
16.       //等待获取客户端的数据  
17.       SourceWait,     
18.       //接收到一个完整mongodb报文后进入该状态  
19.       Process,        
20.       //等待数据发送成功  
21.       SinkWait,       
22.       //接收或者发送数据异常、连接关闭,则进入该状态  _cleanupSession  
23.       EndSession,     
24.       //session回收处理进入该状态  
25.       Ended           
26.   };  
27.   //全部权状态,主要用来判断是否须要在状态转换中跟新线程名,只对动态线程模型生效  
28.   enum class Ownership {   
29.       //该状态表示本状态机SSM处于非活跃状态  
30.       kUnowned,    
31.       //该状态标识本状态机SSM归属于某个工做worker线程,处于活跃调度运行状态  
32.       kOwned,   
33.       //表示SSM固定归属于某个线程  
34.       kStatic   
35.   };  
36.     
37.   ......  
38.private:  
39.   //ThreadGuard能够理解为线程守护,后面在ThreadGuard类中单独说明  
40.   class ThreadGuard;  
41.   friend class ThreadGuard;  
42.     
43.   ......  
44.   //获取session信息  
45.   const transport::SessionHandle& _session()  
46.   //如下两个接口为任务task调度相关接口  
47.   void _scheduleNextWithGuard(...);  
48.   void _runNextInGuard(ThreadGuard guard);  
49.   //接收到一个完整mongodb报文后的处理  
50.   inline void _processMessage(ThreadGuard guard);  
51.   //如下四个接口完成底层数据读写及其对应回调处理  
52.   void _sourceCallback(Status status);  
53.   void _sinkCallback(Status status);  
54.   void _sourceMessage(ThreadGuard guard);  
55.   void _sinkMessage(ThreadGuard guard, Message toSink);  
56.     
57.   //一次客户端请求,当前mongodb服务端所处的状态  
58.   AtomicWord<State> _state{State::Created};  
59.   //服务入口,ServiceEntryPointMongod ServiceEntryPointMongos mongod及mongos入口点  
60.   ServiceEntryPoint* _sep;  
61.   //synchronous及adaptive模式,也就是线程模型是一个连接一个线程仍是动态线程池  
62.   transport::Mode _transportMode;  
63.   //ServiceContextMongoD(mongod)或者ServiceContextNoop(mongos)服务上下文  
64.   ServiceContext* const _serviceContext;  
65.   //也就是本ssm对应的session信息,默认对应ASIOSession   
66.   transport::SessionHandle _sessionHandle;   
67.   //根据session构造对应client信息,ServiceStateMachine::ServiceStateMachine赋值  
68.   //也就是本次请求对应的客户端信息  
69.   ServiceContext::UniqueClient _dbClient;  
70.   //指向上面的_dbClient  
71.   const Client* _dbClientPtr;  
72.    //该SSM当前处理线程的线程名,由于adaptive线程模型一次请求中的不一样状态会修改线程名
73.   const std::string _threadName;  
74.   //修改线程名以前的线程名称  
75.   std::string _oldThreadName;  
76.   //ServiceEntryPointImpl::startSession->ServiceStateMachine::setCleanupHook中设置赋值  
77.   //session连接回收处理  
78.   stdx::function<void()> _cleanupHook;  
79.   //接收处理的message信息  一个完整的报文就记录在该msg中  
80.   Message _inMessage;   
81.   //默认初始化kUnowned,标识本SSM状态机处于非活跃状态,  
82.   //主要用来判断是否须要在状态转换中跟新线程名,只对动态线程模型生效  
83.   AtomicWord<Ownership> _owned{Ownership::kUnowned};  
84.}

       该类核心成员功能说明以下表:

成员名

功能说明

_state

SSM状态机当前所处状态

_sep

服务入口,mongod对应ServiceEntryPointMongod;mongos对应 ServiceEntryPointMongos

_transportMode

synchronous及adaptive模式,也就是线程模型是一个连接一个线程仍是动态线程池

_serviceContext

服务上下文,mongod和mongos分别对应:ServiceContextMongoD、ServiceContextNoop

_sessionHandle

本ssm对应的session信息,默认对应ASIOSession

_dbClient

也就是本次请求对应的客户端信息  

_dbClientPtr

指向上面的_dbClient  

_threadName

SSM所属线程的线程名,初始化为”conn-n”,adaptive线程模型一次请求中的不一样状态会修改线程名  

_oldThreadName

修改线程名以前的线程名称

_cleanupHook

session连接回收处理

_inMessage

接收处理的message信息  一个完整的报文就记录在该msg中

_owned

默认初始化kUnowned,标识本SSM状态机处于非活跃状态,主要用来判断是否须要在状态转换中跟新线程名,只对动态线程模型生效

        咱们知道,连接、session、SSM状态机一一对应,他们也拥有对应的归属权,这里的归属权指的就是当前SSM归属于那个线程,也就是当前SSM状态机调度模块由那个线程实现。归属权经过Ownership类标记,该类保护以下几种状态,每种状态功能说明以下:

归属码

功能说明

kUnowned

未知状态,也就是SSM当前不归属与具体的线程

kOwned

针对adaptive线程模型,表示当前SSM状态转换处理由具体的线程负责,也就是归属权明确

kStatic

针对synchronous线程模型,也就是一个连接一个线程模型。因为一个连接始终由同一个线程处理,所以该连接对应SSM也就始终归属于同一个线程,整个运行状态不会改变。

        Mongodb服务端接收到客户端请求后的数据接收、协议解析、从db层获取数据、发送数据给客户端都是经过SSM状态机进行有序的状态转换处理,SSM调度处理过程当中保护多个状态,每一个状态对应一个状态码,具体状态码及其功能说明以下表所示:

状态码

功能说明

Created

ServiceStateMachine::ServiceStateMachine()构造函数初始状态

Source

下面两种状况会进入该状态:1. 新链接到来第一次进入SSM处理;2. 客户端请求已完成(已经发送DB获取的数据给客户端),等待调度进行该连接的下一轮请求

SourceWait

等待获取客户端的数据

Process

接收到一个完整mongodb报文后进入该状态,开始进行协议解析、db层数据访问

SinkWait

等待数据发送成功,发送数据给客户端过程当中

EndSession

接收或者发送数据异常、连接关闭,session对应客户端,同时进入Ended状态

Ended

session回收处理,进入EndSession后立马经过_cleanupSession进入该状态

       以上是SSM处理请求过程的状态码信息,状态转换的具体实现过程请参考后面的核心代码分析。listerner线程接收到新的客户端连接后会调用经过service_entry_point服务入口点子模块的ssm->start()接口进入SSM状态机调度模块,该接口相关的源码实现以下:

1.//ServiceEntryPointImpl::startSession中执行  启动SSM状态机  
2.void ServiceStateMachine::start(Ownership ownershipModel) {  
3.    //直接调用_scheduleNextWithGuard接口  
4.    _scheduleNextWithGuard(   
5.    //listener线程暂时性的变为conn线程名,在_scheduleNextWithGuard中任  
6.    //务入队完成后,在下面的_scheduleNextWithGuard调用guard.release()恢复listener线程名  
7.        ThreadGuard(this), transport::ServiceExecutor::kEmptyFlags, ownershipModel);  
8.}  
9.  
10.void ServiceStateMachine::_scheduleNextWithGuard(...) {  
11.    //该任务func实际上由worker线程运行,worker线程从asio库的全局队列获取任务调度执行  
12.    auto func = [ ssm = shared_from_this(), ownershipModel ] {  
13.        //构造ThreadGuard  
14.        ThreadGuard guard(ssm.get());    
15.        //说明是sync mode,即一个连接一个线程模式, 归属权明确,属于指定线程  
16.        if (ownershipModel == Ownership::kStatic)   
17.            guard.markStaticOwnership();  
18.        //对应:ServiceStateMachine::_runNextInGuard,在这里面完成状态调度转换  
19.        ssm->_runNextInGuard(std::move(guard));    
20.    };  
21.    //恢复以前的线程名,若是该SSM进入Ended状态,则开始资源回收处理  
22.    guard.release();  
23.    //ServiceExecutorAdaptive::schedule(adaptive)   ServiceExecutorSynchronous::schedule(synchronous)  
24.    //第一次进入该函数的时候在这里面建立新线程,不是第一次则把task任务入队调度  
25.    Status status = _serviceContext->getServiceExecutor()->schedule(std::move(func), flags);  
26.    if (status.isOK()) {  
27.        return;  
28.    }  
29.    //异常处理  
30.    ......  
31.}  

       ServiceStateMachine::start()接口调用ServiceStateMachine::_scheduleNextWithGuard()来启动状态机运行。_scheduleNextWithGuard()接口最核心的做用就是调用service_executor服务运行子模块(线程模型子模块)的schedule()接口来把状态机调度任务入队到ASIO网络库的一个全局队列(adaptive动态线程模型),若是是一个连接一个线程模型,则任务入队到线程级私有队列。

       adaptive线程模型,任务入队以及工做线程调度任务执行的流程将在后续的线程模型子模块中分析,也能够参考:<<Mongodb网络传输处理源码实现及性能调优-体验内核性能极致设计>>

        此外,_scheduleNextWithGuard()入队到全局队列的任务就是本模块后面须要分析的SSM状态机任务,这些task任务经过本函数接口的func (...)进行封装,而后经过线程模型子模块入队到一个全局队列。Func(...)这个task任务中会直接调用_runNextInGuard()接口来进行状态转换处理,该接口也就是入队到ASIO全局队列的任务,核心代码功能以下:

1.void ServiceStateMachine::_runNextInGuard(ThreadGuard guard) {  
2.    //获取当前SSM状态  
3.    auto curState = state();  
4.    // If this is the first run of the SSM, then update its state to Source  
5.    //若是是第一次运行该SSM,则状态为Created,到这里标记能够准备接收数据了  
6.    if (curState == State::Created) {   
7.        //进入Source等待接收数据  
8.        curState = State::Source;  
9.        _state.store(curState);  
10.    }  
11.    //各状态对应处理  
12.    try {  
13.        switch (curState) {   
14.            //接收数据  
15.            case State::Source:    
16.                _sourceMessage(std::move(guard));  
17.                break;  
18.            //以及接收到完整的一个mongodb报文,能够内部处理(解析+命令处理+应答客户端)  
19.            case State::Process:  
20.                _processMessage(std::move(guard));  
21.                break;  
22.            //连接异常或者已经关闭,则开始回收处理  
23.            case State::EndSession:  
24.                _cleanupSession(std::move(guard));  
25.                break;  
26.            default:  
27.                MONGO_UNREACHABLE;  
28.        }  
29.         return;
30.    } catch (...) {  
31.        //异常打印  
32.    }  
33.    //异常处理  
34.    ......  
35.    //进入EndSession状态  
36.    _state.store(State::EndSession);  
37.    _cleanupSession(std::move(guard));  
38.} 

       从上面的代码实现能够看出,真正入队到全局队列中的任务类型只有三个,分别是:

  1. 接收mongodb数据的task任务,简称为readTask。
  2. 接收到一个完整mongodb数据后的后续处理(包括协议解析、命令处理、DB获取数据、发送数据给客户端等),简称为dealTask。
  3. 接收或者发送数据异常、连接关闭等引发的后续资源释放,简称为cleanTask。

       下面针对这三种task任务核心代码实现进行分析:

  1. readTask任务核心代码实现

       readTask任务核心代码实现由_sourceMessage()接口实现,具体代码以下:

1.//接收客户端数据  
2.void ServiceStateMachine::_sourceMessage(ThreadGuard guard) {  
3.    ......  
4.    //获取本session接收数据的ticket,也就是ASIOSourceTicket  
5.    auto ticket = _session()->sourceMessage(&_inMessage);   
6.    //进入等等接收数据状态  
7.    _state.store(State::SourceWait);    
8.    //release恢复worker线程原有的线程名,synchronous线程模型为"conn-xx",adaptive对应worker线程名为"conn-xx"  
9.    guard.release();  
10.    //线程模型默认同步方式,也就是一个连接一个线程  
11.    if (_transportMode == transport::Mode::kSynchronous) {  
12.         //同步方式,读取到一个完整mongodb报文后执行_sourceCallback回调  
13.         _sourceCallback([this](auto ticket) {  
14.            MONGO_IDLE_THREAD_BLOCK;  
15.            return _session()->getTransportLayer()->wait(std::move(ticket));  
16.        }(std::move(ticket)));   
17.    } else if (_transportMode == transport::Mode::kAsynchronous) {  
18.        //adaptive线程模型,异步读取一个mongodb报文后执行_sourceCallback回调  
19.        _session()->getTransportLayer()->asyncWait(   
20.            ////TransportLayerASIO::ASIOSourceTicket::_bodyCallback读取到一个完整报文后执行该回调  
21.            std::move(ticket), [this](Status status) { _sourceCallback(status); });  
22.    }  
23.}  
24.  
25.//接收到一个完整mongodb报文后的回调处理  
26.void ServiceStateMachine::_sourceCallback(Status status) {  
27.    //构造ThreadGuard,修改执行本SSM接口的线程名为conn-xx  
28.    ThreadGuard guard(this);   
29.  
30.    //状态检查  
31.    dassert(state() == State::SourceWait);  
32.    //获取连接session远端信息  
33.    auto remote = _session()->remote();   
34.    if (status.isOK()) {  
35.    //等待调度,进入处理消息阶段  _processMessage  
36.        _state.store(State::Process);  
37.        //注意kMayRecurse标识State::Process阶段的处理仍是由本线程执行,这是一个递归标记  
38.        return _scheduleNextWithGuard(std::move(guard), ServiceExecutor::kMayRecurse);  
39.    }  
40.    ......  
41.    //异常流程调用  
42.    _runNextInGuard(std::move(guard));  
43.}  

       SSM调度的第一个任务就是readTask任务,从上面的源码分析能够看出,该任务就是经过ticket数据分发模块从ASIO网络库读取一个完整长度的mongodb报文,而后执行_sourceCallback回调。进入该回调函数后,即刻设置SSM状态为State::Process状态,而后调用_scheduleNextWithGuard(...)把dealTask任务入队到ASIO的全局队列(adaptive线程模型),或者入队到线程级私有队列(synchronous线程模型)等待worker线程调度执行。

       这里有个细节,在把dealTask入队的时候,携带了kMayRecurse标记,该标记标识该任务能够递归调用,也就是该任务能够由当前线程继续执行,这样也就能够保证同一个请求的taskRead任务和dealTask任务由同一个线程处理。任务递归调度,能够参考后面的线程模型子模块源码实现。

2. dealTask任务核心代码实现

       当读取到一个完整长度的mongodb报文后,就会把dealTask任务入队到全局队列,而后由worker线程调度执行该task任务。dealTask任务的核心代码实现以下:

1.//dealTask处理  
2.void ServiceStateMachine::_processMessage(ThreadGuard guard) {  
3.    ......  
4.    //入口流量计数  
5.    networkCounter.hitLogicalIn(_inMessage.size());  
6.    //获取一个惟一的UniqueOperationContext,一个客户端对应一个UniqueOperationContext  
7.    auto opCtx = Client::getCurrent()->makeOperationContext();  
8.    //ServiceEntryPointMongod::handleRequest  ServiceEntryPointMongos::handleRequest请求处理  
9.    //command处理、DB访问后的数据经过dbresponse返回  
10.    DbResponse dbresponse = _sep->handleRequest(opCtx.get(), _inMessage);  
11.    //释放opCtx,这样currentop就看不到了  
12.    opCtx.reset();  
13.    //须要发送给客户端的应答数据  
14.    Message& toSink = dbresponse.response;  
15.    //应答数据存在  
16.    if (!toSink.empty()) {    
17.        ......  
18.        //发送数据 ServiceStateMachine::_sinkMessage()  
19.        _sinkMessage(std::move(guard), std::move(toSink));  
20.  
21.    } else {  
22.       //若是不须要应答客户端的处理  
23.       ......  
24.    }  
25.}  
26.  
27.//调用Sinkticket发送数据  
28.void ServiceStateMachine::_sinkMessage(ThreadGuard guard, Message toSink) {  
29.    //获取发送数据的ASIOSinkTicket  
30.    auto ticket = _session()->sinkMessage(toSink);  
31.    //进入sink发送等待状态  
32.    _state.store(State::SinkWait);  
33.    //恢复原有的worker线程名  
34.    guard.release();  
35.    //synchronous线程模型,同步发送  
36.    if (_transportMode == transport::Mode::kSynchronous) {  
37.        //最终在ASIOSinkTicket同步发送数据成功后执行_sinkCallback  
38.        _sinkCallback(_session()->getTransportLayer()->wait(std::move(ticket)));  
39.    } else if (_transportMode == transport::Mode::kAsynchronous) {  
40.        //最终在ASIOSinkTicket异步发送数据成功后执行_sinkCallback  
41.        _session()->getTransportLayer()->asyncWait(  
42.            std::move(ticket), [this](Status status) { _sinkCallback(status); });  
43.    }  
44.}  
45.  
46.//sink数据发送  
47.void ServiceStateMachine::_sinkCallback(Status status) {  
48.    //SSM归属于新的guard,同时修改当前线程名为conn-xx  
49.    ThreadGuard guard(this);  
50.    //状态检查  
51.    dassert(state() == State::SinkWait);  
52.    if (!status.isOK()) {  
53.        //进入EndSession状态  
54.        _state.store(State::EndSession);  
55.        //异常状况调用  
56.        return _runNextInGuard(std::move(guard));  
57.    } else if (_inExhaust) { //_inExhaust方式   
58.        //注意这里的状态是process   _processMessage   还须要继续进行Process处理  
59.        _state.store(State::Process);   
60.    } else {   
61.        //正常流程始终进入该分支 _sourceMessage    这里继续进行递归接收数据处理  
62.        //注意这里的状态是Source,继续接收客户端请求  
63.        _state.store(State::Source);  
64.    }  
65.    //本连接对应的一次mongo访问已经应答完成,须要继续要一次调度了  
66.    return _scheduleNextWithGuard(std::move(guard),  
67.                                  ServiceExecutor::kDeferredTask |  
68.                                      ServiceExecutor::kMayYieldBeforeSchedule);  
69.}  

       readTask经过ticket数据分发子模块读取一个完整长度的mongodb报文后,开始dealTask任务逻辑,该任务也就是_processMessage(...)。该接口中核心实现就是调用mongod和mongos实例对应的服务入口类的handleRequest(...)接口来完成后续的command命令处理、DB层数据访问等,访问到的数据存储到DbResponse中,最终在经过_sinkMessage(...)把数据发送出去。

       真正的mongodb内部处理流程实际上就是经过该dealTask任务完成,该任务也是处理整个请求中资源耗费最重的一个环节。在该task任务中,当数据成功发送给客户端后,该session连接对应的SSM状态机进入State::Source状态,继续等待worker线程调度来完成后续该连接的新请求。

3. cleanTask任务

       在数据读写过程、客户端连接关闭、访问DB数据层等任何一个环节异常,则会进入State::EndSession状态。该状态对应得任务实现相对比较简单,具体代码实现以下:

1.//session对应连接回收处理  
2.void ServiceStateMachine::_cleanupSession(ThreadGuard guard) {  
3.    //进入这个状态后在~ThreadGuard::release中进行ssm _cleanupHook处理,该hook在ServiceEntryPointImpl::startSession  
4.    _state.store(State::Ended);  
5.    //清空message buffer  
6.    _inMessage.reset();  
7.    //释放连接对应client资源  
8.    Client::releaseCurrent();  
9.}  

       进入该状态后直接由本线程进行session资源回收和client资源释放处理,而无需状态机调度worker线程来回收。

2.2 关于worker线程名和guardthread线程守护类

       前面得分析咱们知道,当线程模型为adaptive动态线程模型的时候,mongod和mongos实例对应的子线程中有不少名为“conn-xx”和worker-xx的线程,并且同一个线程可能一下子线程名为“conn-xx”,下一次又变为了worker-xx。这个线程名的初始命名和线程名更改与ServiceStateMachine状态机调度类、guardthread线程守护类、worker线程模型等都有关系。

       Worker线程由ServiceExecutor线程模型子模块建立,请参考后续线程模型子模块相关章节。默认初始化线程名为conn-x,初始化代码实现以下:

1.//ServiceStateMachine::create调用,ServiceStateMachine类初始化构造  
2.ServiceStateMachine::ServiceStateMachine(...)  
3.    ......  
4.    //线程名初始化:conn-xx,xx代码session id  
5.    _threadName{str::stream() << "conn-" << _session()->id()} {}   
6.}  
7.  
8.class Session {  
9.    ......  
10.    //sessionID,自增生成  
11.    const Id _id;  
12.}  
13.  
14.//全局sessionIdCounter计数器,初始化为0  
15.AtomicUInt64 sessionIdCounter(0);  
16.  
17.//session id自增  
18.Session::Session() : _id(sessionIdCounter.addAndFetch(1)) {} 

       SSM状态处理过程当中,会把一个完整的请求过程 = readTask任务 + dealTask任务,这两个任务都是经过SSM状态机和ServiceExecutor线程模型子模块的worker线程配合调度完成,在任务处理过程当中处理同一个任务的线程可能会有屡次线程名更改,这个就是结合guardthread线程守护类来完成,以一个线程名切换更改伪代码实现为例:

1.worker_thread_run_task(...)  
2.{  
3.    //若是是adaptive线程模型,当前worker线程名为"worker-xx"  
4.    print(threadName)  
5.    //业务逻辑处理1  
6.    ......  
7.      
8.    //初始化构造ThreadGuard,这里面修改线程名为_ssm->_threadName,也就是"conn-xx",  
9.    //同时保存原来的线程名"worker-xx"到_ssm->_oldThreadName中  
10.    ThreadGuard guard(this);  
11.    //若是是adaptive线程模型,线程名打印内容为"conn-xx"  
12.    print(threadName)  
13.    //业务逻辑处理2  
14.    ......  
15.    //恢复_ssm->_oldThreadName保存的线程名"worker-xx"  
16.    guard.release();  
17.      
18.    //若是是adaptive线程模型,线程名恢复为"worker-xx"  
19.    print(threadName)  
20.}  

       从上面的伪代码能够看出,adaptive线程模型对应worker线程名为worker,在进入ThreadGuard guard(this)流程后,线程名更改成conn-xx线程,当guard.release()释放后恢复原有worker-xx线程名。

       结合前面的SSM状态处理流程,adaptive线程模型能够获得以下总结:底层网络IO数据读写过程,worker线程名会改成worker-xx,其余非网络IO的mongodb内部逻辑处理线程名为conn-xx。因此,若是查看mongod或者mongos进程全部线程名的时候,若是发现线程名为worker-xx,说明当前线程在处理网络IO;若是发现线程名为conn-xx,则说明当前线程在处理内部逻辑处理,对于mongod实例能够理解为主要处理磁盘IO。

       因为synchronous同步线程模型,同一连接对应的全部客户端请求至始至终都有同一线程处理,因此整个处理线程名不会改变,也不必修改线程名,整个过程都是conn-xx线程名。

2.3 该模块函数接口总结大全

     前面分析了主要核心接口源码实现,不少其余接口没有一一列举详细分析,该模块u全部接口功能总结以下,更多接口代码实现详见Mongodb内核源码详细注释分析

类名

函数接口

功能说明

ThreadGuard

ThreadGuard(...)

线程守护初始化,把worker线程名改成conn-xx

ThreadGuard

operator=(...)

类赋值

ThreadGuard

~ThreadGuard()

类析构,析构函数中调用release()接口

ThreadGuard

operator bool()

获取全部权标识

ThreadGuard

markStaticOwnership()

synchronous线程模型默认归属权为kStatic

ThreadGuard

release()

  1. 若是ssm状态为Ended,则连接回收处理
  2. 恢复worker原来的线程名(adaptive:worker-xx)

ServiceStateMachine

ServiceStateMachine(...)

ServiceStateMachine类初始化构造赋值

ServiceStateMachine

_session()

获取当前ssm对应的session信息

ServiceStateMachine

_sourceMessage(...)

等待经过ASIO库接收网络IO数据

ServiceStateMachine

_sinkMessage(...)

等待经过ASIO库发送网络IO数据

ServiceStateMachine

_sourceCallback(...)

接收到一个完整长度mongodb报文的回调处理

ServiceStateMachine

_sinkCallback(...)

发送一个完整应答报文后的回调处理

ServiceStateMachine

_processMessage(...)

开始内部处理,如协议解析、命令处理、DB层数据访问等

ServiceStateMachine

_runNextInGuard(...)

SSM状态机调度运行

ServiceStateMachine

start(...)

接收到一个新连接经过start()接口启用SSM状态机

ServiceStateMachine

_scheduleNextWithGuard(...)

readTask和dealTask交由worker线程处理

ServiceStateMachine

terminate(...)

调用TransportLayerASIO::end()进行套接字回收处理

ServiceStateMachine

setCleanupHook(...)

设置clean hook,也就是回收处理

ServiceStateMachine

state()

获取SSM当前状态

ServiceStateMachine

_terminateAndLogIfError(...)

打印回收日志并进行terminate回收处理

ServiceStateMachine

_cleanupSession(...)

session会话回收处理

3. 总结

       本文主要分析了service_state_machine状态机子模块,该模块把session对应的客户端请求转换为readTask任务、dealTask任务和cleanTask任务,前两个任务经过worker线程完成调度处理,cleanTask任务在内部处理异常或者连接关闭的时候由本线程直接执行,而不是经过worker线程调度执行。

      这三个任务处理过程会分别对应到Created、Source、SourceWait、Process、SinkWait、EndSession、Ended七种状态的一种或者多种,具体详见前面的状态码分析。一个正常的客户端请求状态转换过程以下:

  1. 连接刚创建的第一次请求状态转换过程:

Created->Source -> SourceWait -> Process -> SinkWait -> Source

 2. 该连接后续的请求状态转换过程:

 Source -> SourceWait -> Process -> SinkWait -> Source

      此外,SSM状态机调度模块经过ServiceStateMachine::_scheduleNextWithGuard(...)接口和线程模型子模块关联起来。SSM经过该接口完成worker线程初始建立、task任务入队处理,下期将分析<<网络线程模型子模块>>详细源码实现。

说明:

     该模块更多接口实现细节详见Mongodb内核源码注释:Mongodb内核源码详细注释分析

相关文章
相关标签/搜索