OceanBase处理网络包的逻辑仍是蛮绕的,这里以UPS为例,做为给本身的备忘。网络
UPS代码的main.cpp中调用ObUpdateServerMain的start启动server。start函数会调用ObUpdateServerMain的do_work函数,此函数调用ObUpdateServer类的start启动UPS。数据结构
ObUpdateServer继承自以下几个类:函数
1)common::ObBaseServer,基础的server类,要求派生类实现handlePacket和handleBatchPacket方法(后一个UPS不会使用),这两个是纯虚函数,同时有个虚函数start_service,派生类能够实现此方法用于初始化时作些server相关的逻辑;this
2)ObPacketQueueHandler, 要求派生类实现handlePacketQueue接口;线程
3)common::IBatchPacketQueueHandler,要求派生类实现handleBatchPacketQueue接口;code
ObUpdateServer类的start函数实际上调用的ObBaseServer的start函数,此函数作以下几件事:server
1)初始化libeasy相关的一系列数据结构, 在特定的网络端口上监听等;对象
2)调用initialize函数(须要派生类实现);继承
3)调用start_service函数(派生类可本身实现);接口
ObUpdateServer的initialize函数,作一系列启动UPS内部所需的参数的初始化,除了必要的参数外,有几个须要特别注意下:
1)server_handler_,此变量的类型是easy_io_handler_pt,此处注册一系列回调,包括收包时的编解码(server_handler_.encode/server_handler_.decode),收包时的处理逻辑(server_handler_.process),断开链接时的处理等(server_handler_.on_disconnect)等;编解码相关等都是使用的全部OB server共用的ObTbnetCallback类的方法,版本升级时包的格式变化只须要统一改此common类的方法便可;包的处理逻辑调用的UPS本身的ObUpdateCallback::process方法;
2)client_manager_的初始化,后续server发包都须要用到client_manager_;
3)rpc相关的初始化;
4)一系列worker线程的初始化(如read_thread_queue_/write_thread_queue_/lease_thread_queue_等,具体看代码,指定不一样类型线程的线程数,将this传给各个线程用做handler回调等);
ObUpdateServer的start_service函数,调用start_threads(此函数调用上述一系列worker线程的start方法,等候收到任务处理),start_timer_schedule设定一系列ObTimer设定一些定时器相关的处理逻辑。
如上和网络包处理相关的一系列函数说清楚了,而后具体说说网络包收到后是怎么处理的。
libeasy收到包会调用注册的回调方法ObUpdateCallback::process,此函数会调用ObUpdateServer的handlePacket方法,此函数由各server本身实现,如UPS,会根据packet_code不一样,或者进行本地处理(由于libeasy有多个IO poll thread,因此本地能够处理一些耗时少的操做包),或是扔给不一样的worker类处理。这里分别举个例子:
1)packet_code为OB_SET_OBI_ROLE类型的包,调用read_thread_queue_的push方法,交由read_thread_queue_线程处理;
2)packet_code为OB_UPS_CLEAR_FATAL_STATUS类型的包,调用lease_thread_queue_的push方法,交由lease_thread_queue_线程处理;
3)OB_WRITE/OB_MS_MUTATE/OB_FAKE_WRITE_FOR_KEEP_ALIVE/OB_UPS_PHY_PLAN_EXECUTE/OB_START_TRANSACTION/OB_END_TRANSACTION/OB_WRITE_DUMMY_LOG类型的包,直接在IO线程中调用trans_executor_的handle_packet函数处理;这里注意下,trans_executor_的handle_packet也会进一步判断包的类型,肯定是直接在此函数中处理(transe_executor_类对不一样类型的包实现了一系列的handle函数),仍是经过调用push_task_交由trans_executor本身的一系列worker线程处理。
read_thread_queue_等worker线程是common::ObPacketQueueThread类型的(或者其余相似类型)实例,此类类型都是tbsys::CDefaultRunnable的派生类,其内部含有一个阻塞队列等消息到来,同时有一个用于回调的ObPacketQueueHandler类型的handler(注意ObUpdateServer就继承自ObPacketQueueHandler,实际上此处注册的handler就是ObUpdateServer类的对象)。此类worker线程start后真正调用的是ObPacketQueueThread::run函数,此函数即从内部队列中取消息,而后调用ObUpdateServer的handlePacketQueue函数,handlePacketQueue函数再根据包的类型调用ObUpdateServer类实现的各种方法对包进行处理。此处因为是worker线程,全部包都是线程本地处理的。
至此,整个处理包的逻辑应该被串起来了。其实主要是初看代码时handlePacket/handlePacketQueue/handleBatchPacket/handleBatchPacketQueue各由谁调用,以及在哪一个线程空间中被调用不太好理解。