最近回顾以前的文章,发现最后一篇有些着急了,不少地方没有叙述清楚。这里先作个衔接吧。
咱们仍是以长链接为例,从longlink.cc看起。首先是那个线程函数__Run:
/mars-master/mars/stn/src/longlink.ccjava
void LongLink::__Run() { ...... // 执行链接 SOCKET sock = __RunConnect(conn_profile); // 无效的socket,更新描述文件,记录失败的时间节点,返回 if (INVALID_SOCKET == sock) { conn_profile.disconn_time = ::gettickcount(); conn_profile.disconn_signal = ::getSignal(::getNetInfo() == kWifi); __UpdateProfile(conn_profile); return; } ...... // 执行读写 __RunReadWrite(sock, errtype, errcode, conn_profile); }
实际上核心的就2个,链接和读写,咱们分别看下。
/mars-master/mars/stn/src/longlink.ccc++
SOCKET LongLink::__RunConnect(ConnectProfile& _conn_profile) { std::vector<IPPortItem> ip_items; std::vector<socket_address> vecaddr; ...... // 赋值填充ip_items地址端口数组 netsource_.GetLongLinkItems(ip_items, dns_util_); ...... // 根据ip_items建立socket_address并加入vecaddr中 for (unsigned int i = 0; i < ip_items.size(); ++i) { vecaddr.push_back(socket_address(ip_items[i].str_ip.c_str(), ip_items[i].port).v4tov6_address(isnat64)); } ...... // 建立观察者和ComplexConnect链接核心,而后开始执行链接 LongLinkConnectObserver connect_observer(*this, ip_items); ComplexConnect com_connect(kLonglinkConnTimeout, kLonglinkConnInteral, kLonglinkConnInteral, kLonglinkConnMax); SOCKET sock = com_connect.ConnectImpatient(vecaddr, connectbreak_, &connect_observer); // 返回socket return sock; }
1.建立2个数组,地址端口item和socket_address;
2.调用netsource_.GetLongLinkItems(ip_items, dns_util_);填充IPPortItem数组;
3.根据填充好的前者数组生成socket_address填充后者数组;
4.建立链接观察者;
5.开始执行链接;
首先看看netsource_.GetLongLinkItems是如何填充的:
/mars-master/mars/stn/src/net_source.cc数组
bool NetSource::GetLongLinkItems(std::vector<IPPortItem>& _ipport_items, DnsUtil& _dns_util) { ScopedLock lock(sg_ip_mutex); if (__GetLonglinkDebugIPPort(_ipport_items)) { return true; } lock.unlock(); std::vector<std::string> longlink_hosts = NetSource::GetLongLinkHosts(); if (longlink_hosts.empty()) { xerror2("longlink host empty."); return false; } __GetIPPortItems(_ipport_items, longlink_hosts, _dns_util, true); return !_ipport_items.empty(); }
能够看到debug的优先,这里增长了调试的ip。再往下就先不贴代码了,基本上就是以前经过SetLongLink设置进去的sg_longlink_hosts(长链接主机列表),再往上倒腾就是在MarsServiceNative.java的onCreate中经过描述文件profile设置进去的主机列表。也就是说以前早就设置好的主机列表已经存在了。
下面咱们仍然要进入到上一篇提到的ComplexConnect::ConnectImpatient这个核心函数中看看。
/mars-master/mars/comm/socket/complexconnect.ccapp
SOCKET ComplexConnect::ConnectImpatient(const std::vector<socket_address>& _vecaddr, SocketSelectBreaker& _breaker, MComplexConnect* _observer) { ...... // 根据地址列表,生成ConnectCheckFSM链接列表 std::vector<ConnectCheckFSM*> vecsocketfsm; for (unsigned int i = 0; i < _vecaddr.size(); ++i) { xinfo2(TSF"complex.conn %_", _vecaddr[i].url()); ConnectCheckFSM* ic = new ConnectCheckFSM(_vecaddr[i], timeout_, i, _observer); vecsocketfsm.push_back(ic); } // 下面就是对这个链接列表的各类操做了 do { ...... // 生成socketselect的封装对象,并执行PreSelect前期准备工做 SocketSelect sel(_breaker); sel.PreSelect(); ...... // 执行链接 for (unsigned int i = 0; i < index; ++i) { if (NULL == vecsocketfsm[i]) continue; xgroup2_define(group); vecsocketfsm[i]->PreSelect(sel, group); xgroup2_if(!group.Empty(), TSF"index:%_, @%_, ", i, this) << group; timeout = std::min(timeout, vecsocketfsm[i]->Timeout()); } ...... for (unsigned int i = 0; i < index; ++i) { if (NULL == vecsocketfsm[i]) continue; xgroup2_define(group); vecsocketfsm[i]->AfterSelect(sel, group); xgroup2_if(!group.Empty(), TSF"index:%_, @%_, ", i, this) << group; if (TcpClientFSM::EEnd == vecsocketfsm[i]->Status()) { if (_observer) _observer->OnFinished(i, socket_address(&vecsocketfsm[i]->Address()), vecsocketfsm[i]->Socket(), vecsocketfsm[i]->Error(), vecsocketfsm[i]->Rtt(), vecsocketfsm[i]->TotalRtt(), (int)(gettickcount() - starttime)); vecsocketfsm[i]->Close(); delete vecsocketfsm[i]; vecsocketfsm[i] = NULL; lasterror = -1; continue; } if (TcpClientFSM::EReadWrite == vecsocketfsm[i]->Status() && ConnectCheckFSM::ECheckFail == vecsocketfsm[i]->CheckStatus()) { if (_observer) _observer->OnFinished(i, socket_address(&vecsocketfsm[i]->Address()), vecsocketfsm[i]->Socket(), vecsocketfsm[i]->Error(), vecsocketfsm[i]->Rtt(), vecsocketfsm[i]->TotalRtt(), (int)(gettickcount() - starttime)); vecsocketfsm[i]->Close(); delete vecsocketfsm[i]; vecsocketfsm[i] = NULL; lasterror = -1; continue; } if (TcpClientFSM::EReadWrite == vecsocketfsm[i]->Status() && ConnectCheckFSM::ECheckOK == vecsocketfsm[i]->CheckStatus()) { if (_observer) _observer->OnFinished(i, socket_address(&vecsocketfsm[i]->Address()), vecsocketfsm[i]->Socket(), vecsocketfsm[i]->Error(), vecsocketfsm[i]->Rtt(), vecsocketfsm[i]->TotalRtt(), (int)(gettickcount() - starttime)); xinfo2(TSF"index:%_, sock:%_, suc ConnectImpatient:%_:%_, RTT:(%_, %_), @%_", i, vecsocketfsm[i]->Socket(), vecsocketfsm[i]->IP(), vecsocketfsm[i]->Port(), vecsocketfsm[i]->Rtt(), vecsocketfsm[i]->TotalRtt(), this); retsocket = vecsocketfsm[i]->Socket(); index_ = i; index_conn_rtt_ = vecsocketfsm[i]->Rtt(); index_conn_totalcost_ = vecsocketfsm[i]->TotalRtt(); vecsocketfsm[i]->Socket(INVALID_SOCKET); delete vecsocketfsm[i]; vecsocketfsm[i] = NULL; break; } } } while (true); }
1.数组中的每一个长链接地址依次执行链接;
2.数组中的每一个链接分别作后续处理(一个for循环中的三段处理);socket
咱们首先看看vecsocketfsm[i]->PreSelect(sel, group);这句话,是由ConnectCheckFSM的父类TcpClientFSM实现的:
/mars-master/mars/comm/socket/tcpclient_fsm.ccasync
void TcpClientFSM::PreSelect(SocketSelect& _sel, XLogger& _log) { switch(status_) { case EStart: { PreConnectSelect(_sel, _log); break; } case EConnecting: { _sel.Write_FD_SET(sock_); _sel.Exception_FD_SET(sock_); break; } case EReadWrite: { PreReadWriteSelect(_sel, _log); break; } default: xassert2(false, "preselect status error"); } }
这里是根据这个链接的当前状态决定前置操做的行为(开始链接、读写、链接中)。再往下看就是进行socket的connect。以PreConnectSelect为例,这里生产了socket,并执行了connect,最后将成功链接的socket执行_sel.Write_FD_SET(sock_);保存在了SocketSelect中。
咱们来看下代码:
/mars-master/mars/comm/socket/tcpclient_fsm.cctcp
void TcpClientFSM::PreConnectSelect(SocketSelect& _sel, XLogger& _log) { xassert2(EStart == status_, "%d", status_); // 执行虚函数,由子类继承实现 _OnCreate(); xinfo2(TSF"addr:(%_:%_), ", addr_.ip(), addr_.port()) >> _log; // 生成socket sock_ = socket(addr_.address().sa_family, SOCK_STREAM, IPPROTO_TCP); if (sock_ == INVALID_SOCKET) { error_ = socket_errno; last_status_ = status_; status_ = EEnd; _OnClose(last_status_, error_, false); xerror2(TSF"close socket err:(%_, %_)", error_, socket_strerror(error_)) >> _log; return; } if (::getNetInfo() == kWifi && socket_fix_tcp_mss(sock_) < 0) { #ifdef ANDROID xinfo2(TSF"wifi set tcp mss error:%0", strerror(socket_errno)); #endif } if (0 != socket_ipv6only(sock_, 0)){ xwarn2(TSF"set ipv6only failed. error %_",strerror(socket_errno)); } if (0 != socket_set_nobio(sock_)) { error_ = socket_errno; xerror2(TSF"close socket_set_nobio:(%_, %_)", error_, socket_strerror(error_)) >> _log; } else { xinfo2(TSF"socket:%_, ", sock_) >> _log; } if (0 != error_) { last_status_ = status_; status_ = EEnd; return; } start_connecttime_ = gettickcount(); // 执行链接 int ret = connect(sock_, &(addr_.address()), addr_.address_length()); if (0 != ret && !IS_NOBLOCK_CONNECT_ERRNO(socket_errno)) { end_connecttime_ = ::gettickcount(); error_ = socket_errno; xwarn2(TSF"close connect err:(%_, %_), localip:%_", error_, socket_strerror(error_), socket_address::getsockname(sock_).ip()) >> _log; } else { xinfo2("connect") >> _log; // 记录socket到SocketSelect中 _sel.Write_FD_SET(sock_); _sel.Exception_FD_SET(sock_); } last_status_ = status_; if (0 != error_) status_ = EEnd; else status_ = EConnecting; if (0 == error_) _OnConnect(); }
须要注意的是_OnCreate的调用,其实是子类实现的,这里也就是ConnectCheckFSM实现的:ide
virtual void _OnCreate() { if (m_observer) m_observer->OnCreated(m_index, addr_, sock_);}
这里将观察者与链接对象的生命周期绑在了一块儿,执行了观察者的OnCreated。那么观察者是谁呢?往上看,在LongLink::__RunConnect中生成的LongLinkConnectObserver。固然生命周期的回调并不止OnCreated一个。函数
回到__RunConnect中,看后续处理(for循环的三段处理)。执行AfterSelect并根据每一个链接的状态决定后续处理,上篇已经讲过,再也不累述。oop
那么什么时候终止这个do while循环呢?当for循环的三段处理完毕后,全部的链接过程都已经处理完毕了:
// end of loop bool all_invalid = true; for (unsigned int i = 0; i < vecsocketfsm.size(); ++i) { if (NULL != vecsocketfsm[i]) { all_invalid = false; break; } } if (all_invalid || INVALID_SOCKET != retsocket) break;
最后枚举一遍链接数组,每一个元素检查是否非空,若是还有非空的,就将all_invalid置为false,那么会继续走一次do while。上面的三段处理完毕后,应该是数组中再也不有链接才对,这里的保险处理是对数组再进行检查。至此跳出do while,算是整个链接过程完毕了。
能够看到,通过了三段处理后,链接数组中只会命中一个检测成功的链接,其余的失败和完成的都会置为null。这里从全局看就是一个地址池的淘汰筛选机制。在三段处理的for循环中清除不合格的链接,挑出第一个找到的合格的链接,而后跳出三段后,马上检查整个数组是否已经就剩这一个可用了,若是不是继续执行do while,又会去执行数组中的每一个item的链接过程,再回到三段处理。也就是说全部的数组中的item都会链接一次,而后根据返回的状态决定是否命中最终的一个socket。这是干吗呢这么绕?我以前的理解恐怕还不透彻,如今感受是在找一个稳定的能够读写状态的链接。
第一次进入do while已经链接全部池中的item了,那么在通过了三段处理后淘汰掉不合适的和失败的,而后再进入do while再次执行vecsocketfsm[i]->PreSelect(sel, group);的时候,已经更新了状态并执行了不一样的调用了,再通过三段处理在新的状态下再淘汰一批,最后通过整个运转,留下来的只能是最持久的(稳定的)惟一的一个链接,返回这个。
不得不说,这里确实巧妙,若是我写并不会比这要好。
咱们回来到longlink.cc的线程函数__Run中,当链接处理完毕后,下面继续执行的是__RunReadWrite。咱们来看看:
void LongLink::__RunReadWrite(SOCKET _sock, ErrCmdType& _errtype, int& _errcode, ConnectProfile& _profile) { // Alarm消息触发处理绑定在__OnAlarm上 Alarm alarmnoopinterval(boost::bind(&LongLink::__OnAlarm, this), false); Alarm alarmnooptimeout(boost::bind(&LongLink::__OnAlarm, this), false); }
首先是2个Alarm,这里要理解就须要看看这个Alarm是个什么东西:
/mars-master/mars/comm/alarm.h
template<class T> explicit Alarm(const T& _op, bool _inthread = true) : target_(detail::transform(_op)) , reg_async_(MessageQueue::InstallAsyncHandler(MessageQueue::GetDefMessageQueue())) , runthread_(boost::bind(&Alarm::__Run, this), "alarm") , inthread_(_inthread) , seq_(0), status_(kInit) , after_(0) , starttime_(0) , endtime_(0) , reg_(MessageQueue::InstallMessageHandler(boost::bind(&Alarm::OnAlarm, this, _1, _2), true)) #ifdef ANDROID , wakelock_(NULL) #endif {}
构造函数。这里须要逐句分析,首先是target_(detail::transform(_op))。简单看是个赋值语句,后面的参数须要看这个:
/mars-master/mars/comm/thread/runnable.h
// base template for no argument functor template <class T> struct TransformImplement { static Runnable* transform(const T& t) { return new RunnableFunctor<T>(t); } }; template <class T> inline Runnable* transform(const T& t) { return TransformImplement<T>::transform(t); }
1.这里使用的是c++魔板,直接new了一个RunnableFunctor对象,这个对象是个runnable,其实就是将这个传递进来的参数t包装成了一个runnable,在适当的时候调用他的run方法的时候就会调用这个t了。那么带入到具体的内容中,这个t是_op,就是boost::bind(&LongLink::__OnAlarm, this)。这里又使用了c++的boost库,作了bind操做,绑定了参数this也就是LongLink与函数体LongLink::__OnAlarm。好了,如今target_是个包装好的runnable了,在适当的时候能够执行LongLink::__OnAlarm。
2.reg_async_(MessageQueue::InstallAsyncHandler(MessageQueue::GetDefMessageQueue()))。首先看MessageQueue::InstallAsyncHandler:
/mars-master/mars/comm/messagequeue/message_queue.cc
MessageHandler_t InstallAsyncHandler(const MessageQueue_t& id) { ASSERT(0 != id); return InstallMessageHandler(__AsyncInvokeHandler, false, id); } MessageHandler_t InstallMessageHandler(const MessageHandler& _handler, bool _recvbroadcast, const MessageQueue_t& _messagequeueid) { ASSERT(bool(_handler)); ScopedLock lock(sg_messagequeue_map_mutex); const MessageQueue_t& id = _messagequeueid; if (sg_messagequeue_map.end() == sg_messagequeue_map.find(id)) { ASSERT2(false, "%" PRIu64, id); return KNullHandler; } HandlerWrapper* handler = new HandlerWrapper(_handler, _recvbroadcast, _messagequeueid, __MakeSeq()); sg_messagequeue_map[id].lst_handler.push_back(handler); return handler->reg; } struct HandlerWrapper { HandlerWrapper(const MessageHandler& _handler, bool _recvbroadcast, const MessageQueue_t& _messagequeueid, unsigned int _seq) : handler(_handler), recvbroadcast(_recvbroadcast) { reg.seq = _seq; reg.queue = _messagequeueid; } MessageHandler_t reg; MessageHandler handler; bool recvbroadcast; };
生成了一个HandlerWrapper,并将其保留在了一个map中,随后返回了MessageHandler_t,其中保存了_seq与_messagequeueid。这里个人感受是这个handler就是个相似句柄的东西,保存MessageHandler的一个关联关系,即消息队列与seq码(这里是个自增的静态变量)。实际上调用者只要有这个MessageHandler_t就能够了。最后将这个MessageHandler_t赋值给了reg_async_。这里又有一个对象ScopeRegister是个MessageHandler_t的包装对象,里面统一封装了方法来操做MessageHandler_t。
3.runthread_(boost::bind(&Alarm::__Run, this), "alarm")。一个线程对象,线程函数是Alarm::__Run。没事什么好解释的。
4.inthread_(_inthread), seq_(0), status_(kInit), after_(0) , starttime_(0) , endtime_(0)。都是简单赋值,暂时不去管它。
5.reg_(MessageQueue::InstallMessageHandler(boost::bind(&Alarm::OnAlarm, this, _1, _2), true))。相似2。
好了,这个Alarm能够看作是个消息处理,在有消息触发的状况下会调用到具体的函数中,通常是__OnAlarm。
回到__RunReadWrite,往下看。首先是个while的死循环,咱们单独摘录以下:
while (true) { ...... if (!alarmnoopinterval.IsWaiting()) { ...... if (__NoopReq(noop_xlog, alarmnooptimeout, has_late_toomuch)) { is_noop = true; __NotifySmartHeartbeatHeartReq(_profile, last_noop_interval, last_noop_actual_interval); } ...... } ...... // socket处理 SocketSelect sel(readwritebreak_, true); sel.PreSelect(); sel.Read_FD_SET(_sock); sel.Exception_FD_SET(_sock); ScopedLock lock(mutex_); if (!lstsenddata_.empty()) sel.Write_FD_SET(_sock); lock.unlock(); int retsel = sel.Select(10 * 60 * 1000); ...... // 处理发送(写入) if (sel.Write_FD_ISSET(_sock) && !lstsenddata_.empty()) { ...... ssize_t writelen = ::send(_sock, lstsenddata_.begin()->data.PosPtr(), lstsenddata_.begin()->data.PosLength(), 0); ...... while (it != lstsenddata_.end() && 0 < writelen) { if (0 == it->data.Pos()) OnSend(it->taskid); if ((size_t)writelen >= it->data.PosLength()) { xinfo2(TSF"sub send taskid:%_, cmdid:%_, %_, len(S:%_, %_/%_), ", it->taskid, it->cmdid, it->task_info, it->data.PosLength(), it->data.PosLength(), it->data.Length()) >> xlog_group; writelen -= it->data.PosLength(); if (!it->task_info.empty()) sent_taskids[it->taskid] = it->task_info; LongLinkNWriteData nwrite(it->taskid, it->data.PosLength(), it->cmdid, it->task_info); nsent_datas.push_back(nwrite); it = lstsenddata_.erase(it); } else { xinfo2(TSF"sub send taskid:%_, cmdid:%_, %_, len(S:%_, %_/%_), ", it->taskid, it->cmdid, it->task_info, writelen, it->data.PosLength(), it->data.Length()) >> xlog_group; it->data.Seek(writelen, AutoBuffer::ESeekCur); writelen = 0; } } } ...... // 处理接收(读取) if (sel.Read_FD_ISSET(_sock)) { bufrecv.AllocWrite(64 * 1024, false); ssize_t recvlen = recv(_sock, bufrecv.PosPtr(), 64 * 1024, 0); ...... while (0 < bufrecv.Length()) { uint32_t cmdid = 0; uint32_t taskid = Task::kInvalidTaskID; size_t packlen = 0; AutoBuffer body; int unpackret = longlink_unpack(bufrecv, cmdid, taskid, packlen, body); if (LONGLINK_UNPACK_FALSE == unpackret) { xerror2(TSF"task socket recv sock:%0, unpack error dump:%1", _sock, xdump(bufrecv.Ptr(), bufrecv.Length())); _errtype = kEctNetMsgXP; _errcode = kEctNetMsgXPHandleBufferErr; goto End; } xinfo2(TSF"task socket recv sock:%_, pack recv %_ taskid:%_, cmdid:%_, %_, packlen:(%_/%_)", _sock, LONGLINK_UNPACK_CONTINUE == unpackret ? "continue" : "finish", taskid, cmdid, sent_taskids[taskid], LONGLINK_UNPACK_CONTINUE == unpackret ? bufrecv.Length() : packlen, packlen); lastrecvtime_.gettickcount(); if (LONGLINK_UNPACK_CONTINUE == unpackret) { OnRecv(taskid, bufrecv.Length(), packlen); break; } else { sent_taskids.erase(taskid); bufrecv.Move(-(int)(packlen)); if (__NoopResp(cmdid, taskid, body, alarmnooptimeout, _profile)) { xdebug2(TSF"noopresp span:%0", alarmnooptimeout.ElapseTime()); is_noop = false; } else { OnResponse(kEctOK, 0, cmdid, taskid, body, _profile); } } } } } // 收尾,整个looper退出 End:
从while中的代码可以看出,基本上就是上面摘录的几块,以下所示:
1.__NoopReq调用,无数据状态处理;
2.socket的select处理;
3.处理发送写入部分;
4.处理接收读取部分;
这里须要逐个分析了:
1.__NoopReq:
先看代码,并不长:
bool LongLink::__NoopReq(XLogger& _log, Alarm& _alarm, bool need_active_timeout) { AutoBuffer buffer; uint32_t req_cmdid = 0; bool suc = false; if (identifychecker_.GetIdentifyBuffer(buffer, req_cmdid)) { suc = Send((const unsigned char*)buffer.Ptr(), (int)buffer.Length(), req_cmdid, Task::kLongLinkIdentifyCheckerTaskID); identifychecker_.SetSeq(Task::kLongLinkIdentifyCheckerTaskID); xinfo2(TSF"start noop synccheck taskid:%0, cmdid:%1, ", Task::kLongLinkIdentifyCheckerTaskID, req_cmdid) >> _log; } else { AutoBuffer body; longlink_noop_req_body(body); suc = SendWhenNoData((const unsigned char*) body.Ptr(), body.Length(), longlink_noop_cmdid(), Task::kNoopTaskID); xinfo2(TSF"start noop taskid:%0, cmdid:%1, ", Task::kNoopTaskID, longlink_noop_cmdid()) >> _log; } if (suc) { _alarm.Cancel(); _alarm.Start(need_active_timeout ? (2* 1000) : (10 * 1000)); } else { xerror2("send noop fail"); } return suc; }
说实话,这里看的不是很清晰 ,由于以前确定有忽略的部分,个人猜想是在走了一个发送信令的校验后,根据返回的值的不一样决定是执行send发送数据(使用校验填充好的buffer),仍是走SendWhenNoData发送(自行填充请求体)没有数据的状况。暂时先往下看一步,看看Send:
bool LongLink::__Send(const unsigned char* _pbuf, size_t _len, uint32_t _cmdid, uint32_t _taskid, const std::string& _task_info) { lstsenddata_.push_back(LongLinkSendData()); lstsenddata_.back().cmdid = _cmdid; lstsenddata_.back().taskid = _taskid; longlink_pack(_cmdid, _taskid, _pbuf, _len, lstsenddata_.back().data); lstsenddata_.back().data.Seek(0, AutoBuffer::ESeekStart); lstsenddata_.back().task_info = _task_info; readwritebreak_.Break(); return true; }
这里可以清晰的看到,在使用lstsenddata_这个队列,来进行发送的请求,实际上就是向队列中增长一项。那么如今的问题就在于这个发送的数据时怎么来的了。这就须要咱们弄懂LongLinkIdentifyChecker这个玩意儿。
/mars-master/mars/stn/src/longlink_identify_checker.cc
bool LongLinkIdentifyChecker::GetIdentifyBuffer(AutoBuffer &_buffer, uint32_t &_cmdid) { if (has_checked_) return false; hash_code_buffer_.Reset(); _buffer.Reset(); IdentifyMode mode = (IdentifyMode)GetLonglinkIdentifyCheckBuffer(_buffer, hash_code_buffer_, (int&)_cmdid); switch (mode) { case kCheckNever: { has_checked_ = true; } break; case kCheckNext: { has_checked_ = false; } break; case kCheckNow: { cmd_id_ = _cmdid; return true; } break; default: xassert2(false); } return false; }
调用了GetLonglinkIdentifyCheckBuffer,咱们追溯到stn_logic.cc中:
int GetLonglinkIdentifyCheckBuffer(AutoBuffer& identify_buffer, AutoBuffer& buffer_hash, int32_t& cmdid) { xassert2(sg_callback != NULL); return sg_callback->GetLonglinkIdentifyCheckBuffer(identify_buffer, buffer_hash, cmdid); }
其实是对sg_callback这个回调的调用。最终我找到的线索是在MarsServiceNative.java上层的onCreate中设置了回调:
// set callback AppLogic.setCallBack(stub); StnLogic.setCallBack(stub); SdtLogic.setCallBack(stub);
再接着找到了MarsServiceStub.java中的getLongLinkIdentifyCheckBuffer:
@Override public int getLongLinkIdentifyCheckBuffer(ByteArrayOutputStream identifyReqBuf, ByteArrayOutputStream hashCodeBuffer, int[] reqRespCmdID) { // Send identify request buf to server // identifyReqBuf.write(); return ECHECK_NEVER; }
返回的是ECHECK_NEVER,没有填充buffer。也便是说has_checked_ = true,而后返回false。其实看到这一刻我是崩溃的,真心不知道是想干吗。咱们只能这么理解,只要进入__NoopReq其实都是在走SendWhenNoData发送无数据状态。好吧,咱们从新回到__RunReadWrite中看一下。每次在while循环中一上来只要不是alarmnoopinterval正在等待的状态,那么就走一个发送无数据状态。看看发送无数据的代码:
bool LongLink::SendWhenNoData(const unsigned char* _pbuf, size_t _len, uint32_t _cmdid, uint32_t _taskid) { ScopedLock lock(mutex_); if (kConnected != connectstatus_) return false; if (!lstsenddata_.empty()) return false; return __Send(_pbuf, _len, _cmdid, _taskid, ""); }
实际上是检查lstsenddata_是否有内容,若是没有才发送。那么好吧,这里总体理解就是每次whie循环开始都会检查若是发送队列中没有数据的时候,发送一个特定的无数据状态来确认链接。可是这里写的比较复杂,还须要回调回上层java的代码中,让其来控制状态,从而根据状态控制流程,只能说考虑的很周全,任何状况在任何节点均可以有处理。吐槽下若是咱们本身写来规划这部分的时候大多数时候都是最对无数据检测放在下层,而后直接就发送了,不会让上层这里进行什么干涉吧。其实这里还有些点没有详细的分析很清楚,原谅文章有限,毕竟不能偏离主线太多。
2.socket的select操做。
这里倒没什么可说的,前面的设置,为后面的sel.Select(10 60 1000)作准备,内部采用poll来运做。
3.发送过程。
先是判断若是发送队列里面有内容,执行下面的::send(_sock, lstsenddata_.begin()->data.PosPtr(), lstsenddata_.begin()->data.PosLength(), 0)。这里注意,参数给定的是队列的第一个的data,也就是说这里是取出第一个执行发送。
下面就是一个while循环,将发送队列过了一遍。若是刚才发送的数据大小与待发送的实际数据长度相等,那么认为是发送完了这一个,从队列中移除这一个,而后下一次while会自动取下一个了。若是没有;认为是没发完,位移数据,下次while仍然获取到这个item,可是数据位移已经变了,所以继续发送下面的数据。通过这个while以后,全部的发送队列中的数据都应当发送完毕了。
4.接收过程。
前面没什么好说的,无非是开辟buffer空间,而后执行recv调用。以后进入一个while循环,条件是读取的buffer有数据。
首先走一个解包调用,内部走的是__unpack_test,具体内容就不贴了,我简单看了下,基本上就是解开头部,头部的信息标识了本次传递的基本信息,包括了版本号等内容,一个结构体,仍是比较标准的。这里是尝试解包,若是本次接收到的大小连头部都不够,那确定返回错误,须要继续接收了。那么从这个可以看出,每次传递的数据都是带有一个头部的__STNetMsgXpHeader。这东西里面塞入的内容能够和客户端的版本,当前这个信令的id等关联起来。
再下去看到的就是对解包返回值的判断了,若是一切顺利,就走到sent_taskids.erase(taskid);这里须要注意,这个sent_taskids是个发送的taskid的map,这里能够推测发送和接受实际上是关联的,这里接收完毕移除这个保留项。而后走的__NoopResp这个调用。若是返回false表示不是空的信令返回,那么就走OnResponse。这个函数其实是在LongLinkTaskManager中绑定了longlink_->OnResponse = boost::bind(&LongLinkTaskManager::__OnResponse, this, _1, _2, _3, _4, _5, _6);绑定在了LongLinkTaskManager::__OnResponse这里。
void LongLinkTaskManager::__OnResponse(ErrCmdType _error_type, int _error_code, uint32_t _cmdid, uint32_t _taskid, AutoBuffer& _body, const ConnectProfile& _connect_profile) { copy_wrapper<AutoBuffer> body(_body); RETURN_LONKLINK_SYNC2ASYNC_FUNC(boost::bind(&LongLinkTaskManager::__OnResponse, this, _error_type, _error_code, _cmdid, _taskid, body, _connect_profile)); ...... int err_code = 0; int handle_type = Buf2Resp(it->task.taskid, it->task.user_context, body, err_code, Task::kChannelLong); switch(handle_type){ case kTaskFailHandleNoError: { dynamic_timeout_.CgiTaskStatistic(it->task.cgi, (unsigned int)it->transfer_profile.send_data_size + (unsigned int)body->Length(), ::gettickcount() - it->transfer_profile.start_send_time); __SingleRespHandle(it, kEctOK, err_code, handle_type, _connect_profile); xassert2(fun_notify_network_err_); fun_notify_network_err_(__LINE__, kEctOK, err_code, _connect_profile.ip, _connect_profile.port); } break; ...... } }
其实就2件事,经过Buf2Resp底层回包返回给上层解析。若是没有错误kTaskFailHandleNoError,会执行__SingleRespHandle:
bool LongLinkTaskManager::__SingleRespHandle(std::list<TaskProfile>::iterator _it, ErrCmdType _err_type, int _err_code, int _fail_handle, const ConnectProfile& _connect_profile) { ...... int cgi_retcode = fun_callback_(_err_type, _err_code, _fail_handle, _it->task, (unsigned int)(curtime - _it->start_task_time)); ...... }
这里的关键点就这一个,调用回调,回调的绑定在net_core.cc中的NetCore构造里,longlink_task_manager_->fun_callback_ = boost::bind(&NetCore::__CallBack, this, (int)kCallFromLong, _1, _2, _3, _4, _5);,最终执行的是NetCore::__CallBack:
int NetCore::__CallBack(int _from, ErrCmdType _err_type, int _err_code, int _fail_handle, const Task& _task, unsigned int _taskcosttime) { if (task_callback_hook_ && 0 == task_callback_hook_(_from, _err_type, _err_code, _fail_handle, _task)) { xwarn2(TSF"task_callback_hook let task return. taskid:%_, cgi%_.", _task.taskid, _task.cgi); return 0; } if (kEctOK == _err_type || kTaskFailHandleTaskEnd == _fail_handle) return OnTaskEnd(_task.taskid, _task.user_context, _err_type, _err_code); if (kCallFromZombie == _from) return OnTaskEnd(_task.taskid, _task.user_context, _err_type, _err_code); #ifdef USE_LONG_LINK if (!zombie_task_manager_->SaveTask(_task, _taskcosttime)) #endif return OnTaskEnd(_task.taskid, _task.user_context, _err_type, _err_code); return 0; }
看到了吧,走了OnTaskEnd,任务结束。
此文从中间部分开始粗糙了,前面铺垫的东西后面没有讲到,心不静的时候分析东西效果确实不大好。总而言之既然坚持写完了,这里仍是留个记录吧,往后有机会的时候会回顾把这部分完善好。