接着上篇的雪崩检测,回顾下LongLinkTaskManager::__RunOnStartTask:
/mars-master/mars/stn/src/longlink_task_manager.ccjava
void LongLinkTaskManager::__RunOnStartTask() { std::list<TaskProfile>::iterator first = lst_cmd_.begin(); std::list<TaskProfile>::iterator last = lst_cmd_.end(); bool ismakesureauthruned = false; bool ismakesureauthsuccess = false; uint64_t curtime = ::gettickcount(); bool canretry = curtime - lastbatcherrortime_ >= retry_interval_; bool canprint = true; int sent_count = 0; while (first != last) { std::list<TaskProfile>::iterator next = first; ++next; ...... if (!first->antiavalanche_checked) { if (!Req2Buf(first->task.taskid, first->task.user_context, bufreq, error_code, Task::kChannelLong)) { __SingleRespHandle(first, kEctEnDecode, error_code, kTaskFailHandleTaskEnd, longlink_->Profile()); first = next; continue; } // 雪崩检测 xassert2(fun_anti_avalanche_check_); if (!fun_anti_avalanche_check_(first->task, bufreq.Ptr(), (int)bufreq.Length())) { __SingleRespHandle(first, kEctLocal, kEctLocalAntiAvalanche, kTaskFailHandleTaskEnd, longlink_->Profile()); first = next; continue; } first->antiavalanche_checked = true; } if (!longlinkconnectmon_->MakeSureConnected()) { break; } if (0 == bufreq.Length()) { if (!Req2Buf(first->task.taskid, first->task.user_context, bufreq, error_code, Task::kChannelLong)) { __SingleRespHandle(first, kEctEnDecode, error_code, kTaskFailHandleTaskEnd, longlink_->Profile()); first = next; continue; } // 雪崩检测 xassert2(fun_anti_avalanche_check_); if (!first->antiavalanche_checked && !fun_anti_avalanche_check_(first->task, bufreq.Ptr(), (int)bufreq.Length())) { __SingleRespHandle(first, kEctLocal, kEctLocalAntiAvalanche, kTaskFailHandleTaskEnd, longlink_->Profile()); first = next; continue; } } first->transfer_profile.loop_start_task_time = ::gettickcount(); first->transfer_profile.first_pkg_timeout = __FirstPkgTimeout(first->task.server_process_cost, bufreq.Length(), sent_count, dynamic_timeout_.GetStatus()); first->current_dyntime_status = (first->task.server_process_cost <= 0) ? dynamic_timeout_.GetStatus() : kEValuating; first->transfer_profile.read_write_timeout = __ReadWriteTimeout(first->transfer_profile.first_pkg_timeout); first->transfer_profile.send_data_size = bufreq.Length(); first->running_id = longlink_->Send((const unsigned char*) bufreq.Ptr(), (unsigned int)bufreq.Length(), first->task.cmdid, first->task.taskid, first->task.send_only ? "":first->task.cgi); if (!first->running_id) { xwarn2(TSF"task add into longlink readwrite fail cgi:%_, cmdid:%_, taskid:%_", first->task.cgi, first->task.cmdid, first->task.taskid); first = next; continue; } xinfo2(TSF"task add into longlink readwrite suc cgi:%_, cmdid:%_, taskid:%_, size:%_, timeout(firstpkg:%_, rw:%_, task:%_), retry:%_", first->task.cgi, first->task.cmdid, first->task.taskid, first->transfer_profile.send_data_size, first->transfer_profile.first_pkg_timeout / 1000, first->transfer_profile.read_write_timeout / 1000, first->task_timeout / 1000, first->remain_retry_count); if (first->task.send_only) { __SingleRespHandle(first, kEctOK, 0, kTaskFailHandleNoError, longlink_->Profile()); } ++sent_count; first = next; } }
其实后面就剩下一个longlink_->Send,这个才是真正的发送函数,前面的是一堆参数的设定。好吧,咱们来看看:
/mars-master/mars/stn/src/longlink.ccwindows
bool LongLink::Send(const unsigned char* _pbuf, size_t _len, uint32_t _cmdid, uint32_t _taskid, const std::string& _task_info) { ScopedLock lock(mutex_); if (kConnected != connectstatus_) return false; return __Send(_pbuf, _len, _cmdid, _taskid, _task_info); } bool LongLink::__Send(const unsigned char* _pbuf, size_t _len, uint32_t _cmdid, uint32_t _taskid, const std::string& _task_info) { lstsenddata_.push_back(LongLinkSendData()); lstsenddata_.back().cmdid = _cmdid; lstsenddata_.back().taskid = _taskid; longlink_pack(_cmdid, _taskid, _pbuf, _len, lstsenddata_.back().data); lstsenddata_.back().data.Seek(0, AutoBuffer::ESeekStart); lstsenddata_.back().task_info = _task_info; readwritebreak_.Break(); return true; }
能够直接看__Send方法了,就是将须要传输的数据以LongLinkSendData为载体压入队列中,而后执行了SocketSelectBreaker::Break:
/mars-master/mars/comm/windows/SocketSelect/SocketSelect.cpp数组
bool SocketSelectBreaker::Break() { ScopedLock lock(m_mutex); if (m_broken) return true; char dummy[] = "1"; int ret = sendto(m_socket_w, &dummy, strlen(dummy), 0, (sockaddr*)&m_sendin, m_sendinlen); m_broken = true; if (ret < 0 || ret != strlen(dummy)) { xerror2(TSF"sendto Ret:%_, errno:(%_, %_)", ret, errno, WSAGetLastError()); m_broken = false; ReCreate(); } // Ret = WSAGetLastError(); return m_broken; }
这里能够看到,实际上只发送了一个字符1.实际上这个发送只是为了检测当前通道是否正常可用的,能够理解为一种心跳吧,不过不是定时的那种。
也就是说,每次入队一个待发送数据时,都要进行一下通道检测。那么后面确定有队列的自我运起色制来进行真实的数据发送。那么咱们来找找线索吧。
在LongLink的构造时候,已经将LongLink::__Run经过boost::bind赋值给了thread_。那么LongLink::MakeSureConnected里面又执行了thread_.start(&newone);能够看到是个线程在运转着__Run函数。那么在哪里调用的LongLink::MakeSureConnected,找到的一个线索链:StnLogic.java::makesureLongLinkConnected->stn_logic.cc::MakesureLonglinkConnected->NetCore::MakeSureLongLinkConnect->LongLink::MakeSureConnected。咱们把这个调用线索代码贴到下面:异步
public class StnLogic { /** * 检测长连接状态.若是没有链接上,则会尝试重连. */ public static native void makesureLongLinkConnected(); } // stn_logic.cc void MakesureLonglinkConnected() { xinfo2(TSF "make sure longlink connect"); STN_WEAK_CALL(MakeSureLongLinkConnect()); } void NetCore::MakeSureLongLinkConnect() { #ifdef USE_LONG_LINK longlink_task_manager_->LongLinkChannel().MakeSureConnected(); #endif } bool LongLink::MakeSureConnected(bool* _newone) { if (_newone) *_newone = false; ScopedLock lock(mutex_); if (kConnected == ConnectStatus()) return true; bool newone = false; thread_.start(&newone); if (newone) { connectstatus_ = kConnectIdle; conn_profile_.Reset(); identifychecker_.Reset(); disconnectinternalcode_ = kNone; readwritebreak_.Clear(); connectbreak_.Clear(); } if (_newone) *_newone = newone; return false; }
最后会被上层samples的MarsServiceNative调用:socket
@Override public void onCreate() { super.onCreate(); final MarsServiceProfile profile = gFactory.createMarsServiceProfile(); stub = new MarsServiceStub(this, profile); // set callback AppLogic.setCallBack(stub); StnLogic.setCallBack(stub); SdtLogic.setCallBack(stub); // Initialize the Mars PlatformComm Mars.init(getApplicationContext(), new Handler(Looper.getMainLooper())); // Initialize the Mars StnLogic.setLonglinkSvrAddr(profile.longLinkHost(), profile.longLinkPorts()); StnLogic.setShortlinkSvrAddr(profile.shortLinkPort()); StnLogic.setClientVersion(profile.productID()); Mars.onCreate(true); // !!!这里调用!!! StnLogic.makesureLongLinkConnected(); // Log.d(TAG, "mars service native created"); }
总之就是最后启动一个线程来执行,线程函数是LongLink::__Run:
/mars-master/mars/stn/src/longlink.cctcp
void LongLink::__Run() { // sync to MakeSureConnected data reset { ScopedLock lock(mutex_); } uint64_t cur_time = gettickcount(); xinfo_function(TSF"LongLink Rebuild span:%_, net:%_", conn_profile_.disconn_time != 0 ? cur_time - conn_profile_.disconn_time : 0, getNetInfo()); ConnectProfile conn_profile; conn_profile.start_time = cur_time; conn_profile.conn_reason = conn_profile_.disconn_errcode; getCurrNetLabel(conn_profile.net_type); conn_profile.tid = xlogger_tid(); __UpdateProfile(conn_profile); #ifdef ANDROID wakelock_.Lock(30 * 1000); #endif SOCKET sock = __RunConnect(conn_profile); #ifdef ANDROID wakelock_.Lock(1000); #endif if (INVALID_SOCKET == sock) { conn_profile.disconn_time = ::gettickcount(); conn_profile.disconn_signal = ::getSignal(::getNetInfo() == kWifi); __UpdateProfile(conn_profile); return; } ErrCmdType errtype = kEctOK; int errcode = 0; __RunReadWrite(sock, errtype, errcode, conn_profile); socket_close(sock); conn_profile.disconn_time = ::gettickcount(); conn_profile.disconn_errtype = errtype; conn_profile.disconn_errcode = errcode; conn_profile.disconn_signal = ::getSignal(::getNetInfo() == kWifi); __ConnectStatus(kDisConnected); __UpdateProfile(conn_profile); if (kEctOK != errtype) __RunResponseError(errtype, errcode, conn_profile); #ifdef ANDROID wakelock_.Lock(1000); #endif }
咱们只看重点吧:
1.__RunConnect,链接;
2.__RunReadWrite,执行读写(阻塞不断执行);ide
__RunConnect的代码就不贴了,核心的就是com_connect.ConnectImpatient。
/mars-master/mars/comm/socket/complexconnect.cc函数
SOCKET ComplexConnect::ConnectImpatient(const std::vector<socket_address>& _vecaddr, SocketSelectBreaker& _breaker, MComplexConnect* _observer) { ...... // 生成ConnectCheckFSM数组 for (unsigned int i = 0; i < _vecaddr.size(); ++i) { xinfo2(TSF"complex.conn %_", _vecaddr[i].url()); ConnectCheckFSM* ic = new ConnectCheckFSM(_vecaddr[i], timeout_, i, _observer); vecsocketfsm.push_back(ic); } ...... do { ...... // 前置准备工做 SocketSelect sel(_breaker); sel.PreSelect(); ...... // 内部执行链接 for (unsigned int i = 0; i < index; ++i) { if (NULL == vecsocketfsm[i]) continue; xgroup2_define(group); vecsocketfsm[i]->PreSelect(sel, group); xgroup2_if(!group.Empty(), TSF"index:%_, @%_, ", i, this) << group; timeout = std::min(timeout, vecsocketfsm[i]->Timeout()); } ...... // 执行select if (INT_MAX == timeout) { ret = sel.Select(); } else { timeout = std::max(0, timeout); ret = sel.Select(timeout); } ...... for (unsigned int i = 0; i < index; ++i) { if (NULL == vecsocketfsm[i]) continue; xgroup2_define(group); vecsocketfsm[i]->AfterSelect(sel, group); xgroup2_if(!group.Empty(), TSF"index:%_, @%_, ", i, this) << group; if (TcpClientFSM::EEnd == vecsocketfsm[i]->Status()) { if (_observer) _observer->OnFinished(i, socket_address(&vecsocketfsm[i]->Address()), vecsocketfsm[i]->Socket(), vecsocketfsm[i]->Error(), vecsocketfsm[i]->Rtt(), vecsocketfsm[i]->TotalRtt(), (int)(gettickcount() - starttime)); vecsocketfsm[i]->Close(); delete vecsocketfsm[i]; vecsocketfsm[i] = NULL; lasterror = -1; continue; } if (TcpClientFSM::EReadWrite == vecsocketfsm[i]->Status() && ConnectCheckFSM::ECheckFail == vecsocketfsm[i]->CheckStatus()) { if (_observer) _observer->OnFinished(i, socket_address(&vecsocketfsm[i]->Address()), vecsocketfsm[i]->Socket(), vecsocketfsm[i]->Error(), vecsocketfsm[i]->Rtt(), vecsocketfsm[i]->TotalRtt(), (int)(gettickcount() - starttime)); vecsocketfsm[i]->Close(); delete vecsocketfsm[i]; vecsocketfsm[i] = NULL; lasterror = -1; continue; } if (TcpClientFSM::EReadWrite == vecsocketfsm[i]->Status() && ConnectCheckFSM::ECheckOK == vecsocketfsm[i]->CheckStatus()) { if (_observer) _observer->OnFinished(i, socket_address(&vecsocketfsm[i]->Address()), vecsocketfsm[i]->Socket(), vecsocketfsm[i]->Error(), vecsocketfsm[i]->Rtt(), vecsocketfsm[i]->TotalRtt(), (int)(gettickcount() - starttime)); xinfo2(TSF"index:%_, sock:%_, suc ConnectImpatient:%_:%_, RTT:(%_, %_), @%_", i, vecsocketfsm[i]->Socket(), vecsocketfsm[i]->IP(), vecsocketfsm[i]->Port(), vecsocketfsm[i]->Rtt(), vecsocketfsm[i]->TotalRtt(), this); retsocket = vecsocketfsm[i]->Socket(); index_ = i; index_conn_rtt_ = vecsocketfsm[i]->Rtt(); index_conn_totalcost_ = vecsocketfsm[i]->TotalRtt(); vecsocketfsm[i]->Socket(INVALID_SOCKET); delete vecsocketfsm[i]; vecsocketfsm[i] = NULL; break; } } ...... } while (true); }
1.根据传递进来的一个地址数组,来生成ConnectCheckFSM的一个数组;
2.进入一个do while的死循环;
3.根据入口的SocketSelectBreaker建立SocketSelect,并执行PreSelect方法,执行一个前期准备工做;
4.对地址池中的每一个ConnectCheckFSM进行链接,若是状态不是要进行链接,则执行别的前置操做。,在这个过程当中,会将链接的socket保存在SocketSelect中(这里有必要在后面看下深刻的代码);
5.执行链接的select操做,异步检测是否有数据可从通道上读取;
6.以后的for循环,作select后的数据读取等事情,将地址集对应的ConnectCheckFSM全部对象都执行一下AfterSelect,并根据返回的状态,调用回调通知观察者;oop
下面咱们来看一下TcpClientFSM::AfterSelect:
/mars-master/mars/comm/socket/tcpclient_fsm.ccui
void TcpClientFSM::AfterSelect(SocketSelect& _sel, XLogger& _log) { if (EConnecting == status_) AfterConnectSelect(_sel, _log); else if (EReadWrite == status_) AfterReadWriteSelect(_sel, _log); if (EEnd == status_ && INVALID_SOCKET != sock_) { _OnClose(last_status_, error_, false); } }
根据状态的不一样调用不一样的函数执行,若是是链接,调用AfterConnectSelect,若是是读写,调用AfterReadWriteSelect。
下面看下AfterConnectSelect:
void TcpClientFSM::AfterConnectSelect(const SocketSelect& _sel, XLogger& _log) { xassert2(EConnecting == status_, "%d", status_); int timeout = ConnectTimeout(); xinfo2(TSF"sock:%_, (%_:%_), ", sock_, addr_.ip(), addr_.port()) >> _log; if (_sel.Exception_FD_ISSET(sock_)) { socklen_t len = sizeof(error_); if (0 != getsockopt(sock_, SOL_SOCKET, SO_ERROR, &error_, &len)) { error_ = socket_errno; } xwarn2(TSF"close connect exception: (%_, %_)", sock_, error_, socket_strerror(error_)) >> _log; end_connecttime_ = gettickcount(); last_status_ = status_; status_ = EEnd; return; } error_ = socket_error(sock_); if (0 != error_) { xwarn2(TSF"close connect error:(%_, %_), ", error_, socket_strerror(error_)) >> _log; end_connecttime_ = gettickcount(); last_status_ = status_; status_ = EEnd; return; } if (0 == error_ && _sel.Write_FD_ISSET(sock_)){ end_connecttime_ = gettickcount(); last_status_ = status_; status_ = EReadWrite; xinfo2(TSF"connected Rtt:%_, ", Rtt()) >> _log; _OnConnected(Rtt()); return; } if (0 >= timeout) { end_connecttime_ = gettickcount(); xwarn2(TSF"close connect timeout:(%_, %_), (%_, %_)", ConnectAbsTimeout(), -timeout, SOCKET_ERRNO(ETIMEDOUT), socket_strerror(SOCKET_ERRNO(ETIMEDOUT))) >> _log; error_ = SOCKET_ERRNO(ETIMEDOUT); last_status_ = status_; status_ = EEnd; return; } }
若是成功,调用_OnConnected。而后经过他会调回到继承者的同名虚函数中,在这里就是ConnectCheckFSM:
virtual void _OnConnected(int _rtt) { m_checkfintime = ::gettickcount(); if (!m_observer) return; m_observer->OnConnected(m_index, addr_, sock_, 0, _rtt); if (ECheckOK == CheckStatus()) { return; } if (!m_observer->OnVerifySend(m_index, addr_, sock_, send_buf_)) { m_check_status = ECheckFail; } }
这里首先调用了观察者的OnConnected,这个观察者就是LongLinkConnectObserver。
咱们回来看ConnectImpatient,在循环里执行了AfterSelect,以后根据每一个ConnectCheckFSM的状态更新vecsocketfsm数组。在for循环的下面会有3段代码来作这个根据状态更新数组的操做,前两段是若是链接已经关闭的处理和错误的状况处理,都须要从数组中将该项目置为null。第三段是成功完成的状况处理。注意,前两段是continue,而第三段是break。怎么理解这里呢?个人解释是,自己是有个地址池的链接方式,若是其中一个可以成功链接上而且可以正常收发,那么其他的就不须要再尝试了,所以这里作了break处理。能够看到这里的3种状况处理了TcpClientFSM::EEnd、TcpClientFSM::EReadWrite,那么若是是ESstart和EConnecting的状况下,是不会清除这个数组元素的。再接着看,是这个for循环以后的处理,循环判断全部的链接是否都是无效的,若是都是无效的,继续执行这个while死循环,不然若是有一个是有效的,那么跳出来。也就是说,再次执行的时候index也会进行上面的自增++运算,那么继续日后尝试下一个链接。再日后看,是跳出了while死循环的状况,又把这些链接依次close,而后清除了数组。再而后是返回了retsocket。这玩意儿的惟一赋值是在上面的for循环中的第三段断定中,这段断定才是一个关键,就是说一个可用的链接出现了。那么直接带来的就是返回一个可用的socket,不然返回的将是个INVALID_SOCKET。稍微总结下这里,仍是挺巧妙的,能够理解为从地址池中找到可用的链接,不是漫无目的的尝试,而是递进式,而且将无效的随时置为null。不过说实话,应当能够写的更简洁,这里实在是有些晦涩。至此,链接部分分析完毕。