接力上篇的上层分析,咱们此次深刻到底层的mars内部来看看。他在另一个工程mars中。咱们打开它,直接先看初始化,在上层的MarsServiceNative的OnCreate中调用了Mars.init(getApplicationContext(), new Handler(Looper.getMainLooper()));java
/mars-master/mars/libraries/mars_android_sdk/src/main/java/com/tencent/mars/Mars.javaandroid
/** * APP建立时初始化平台回调 必须在onCreate方法前调用 * @param _context * @param _handler */ public static void init(Context _context, Handler _handler) { PlatformComm.init(_context, _handler); hasInitialized = true; }
调用了PlatformComm.init,并将自身初始化状态修改成true。继续:
/mars-master/mars/libraries/mars_android_sdk/src/main/java/com/tencent/mars/comm/PlatformComm.javac++
public static void init(Context ncontext, Handler nhandler) { context = ncontext; handler = nhandler; NetworkSignalUtil.InitNetworkSignalUtil(ncontext); }
保留了context和handler,而后初始化网络信号强度单元。注意:这个handler是上层的服务传递过来的主Looper生成的。服务器
下面看看网络单元的初始化:
/mars-master/mars/libraries/mars_android_sdk/src/main/java/com/tencent/mars/comm/NetworkSignalUtil.java微信
public static void InitNetworkSignalUtil(Context ncontext) { context = ncontext; TelephonyManager mgr = (TelephonyManager)context.getSystemService(Context.TELEPHONY_SERVICE); mgr.listen(new PhoneStateListener() { @Override public void onSignalStrengthsChanged(SignalStrength signalStrength){ super.onSignalStrengthsChanged(signalStrength); calSignalStrength(signalStrength); } } ,PhoneStateListener.LISTEN_SIGNAL_STRENGTHS); }
获得TelephonyManager,并进行监听状态改变。当信号改变时,调用calSignalStrength记录信号强度:网络
private static void calSignalStrength(SignalStrength sig) { int nSig = 0; if(sig.isGsm()) nSig = sig.getGsmSignalStrength(); else nSig = (sig.getCdmaDbm() + 113) / 2; if(sig.isGsm() && nSig == 99) strength = 0; else { strength = (long)(nSig * ((float)100 / (float)31)); strength = (strength > 100 ? 100 : strength); strength = (strength < 0 ? 0 : strength); } }
其实就是监控信号强度。
而后咱们来看看Mars.onCreate,他是紧接着Mars.init调用的:app
public static void onCreate(boolean isFirstStartup) { if (isFirstStartup && hasInitialized) { BaseEvent.onCreate(); } else if (!isFirstStartup) { BaseEvent.onCreate(); } else { /** * 首次启动但未调用init 没法进行BaseEvent create */ throw new IllegalStateException("function MarsCore.init must be executed before Mars.onCreate when application firststartup."); } }
这里初始化了BaseEvent.onCreate:异步
public static native void onCreate();
是个native层函数。
回到Mars中看到还有loadDefaultMarsLibrary加载几个so文件。实在StnLogic的一开始就调用的:
/mars-master/mars/libraries/mars_android_sdk/src/main/java/com/tencent/mars/stn/StnLogic.javasocket
public class StnLogic { public static final String TAG = "mars.StnLogic"; static { Mars.loadDefaultMarsLibrary(); }
剩下的工做都要在StnLogic中进行,包括发送最终也是跟他打交道,先不着急看这个StnLogic,咱们继续深刻下去看看BaseEvent在干什么:
/Users/WangJF/Desktop/res/im/mars-master/mars/libraries/mars_android_sdk/src/main/java/com/tencent/mars/BaseEvent.javaasync
public class BaseEvent { public static native void onCreate(); public static native void onDestroy(); public static native void onNetworkChange(); public static native void onForeground(final boolean forground); public static native void onSingalCrash(int sig); public static native void onExceptionCrash(); /** * 网络切换监听,客户端经过注册该广播通知mars stn网络切换 */ public static class ConnectionReceiver extends BroadcastReceiver { public static NetworkInfo lastActiveNetworkInfo = null; public static WifiInfo lastWifiInfo = null; public static boolean lastConnected = true; public static String TAG = "mars.ConnectionReceiver"; @Override public void onReceive(Context context, Intent intent) { if (context == null || intent == null) { return; } ConnectivityManager mgr = (ConnectivityManager) context.getSystemService(Context.CONNECTIVITY_SERVICE); NetworkInfo netInfo = null; try { netInfo = mgr.getActiveNetworkInfo(); } catch (Exception e) { Log.i(TAG, "getActiveNetworkInfo failed."); } checkConnInfo(context, netInfo); } public void checkConnInfo(final Context context, final NetworkInfo activeNetInfo) { if (activeNetInfo == null) { lastActiveNetworkInfo = null; lastWifiInfo = null; BaseEvent.onNetworkChange(); } else if (activeNetInfo.getDetailedState() != NetworkInfo.DetailedState.CONNECTED) { if (lastConnected) { lastActiveNetworkInfo = null; lastWifiInfo = null; BaseEvent.onNetworkChange(); } lastConnected = false; } else { if (isNetworkChange(context, activeNetInfo)) { BaseEvent.onNetworkChange(); } lastConnected = true; } } public boolean isNetworkChange(final Context context, final NetworkInfo activeNetInfo) { boolean isWifi = (activeNetInfo.getType() == ConnectivityManager.TYPE_WIFI); if (isWifi) { WifiManager wifiManager = (WifiManager) context.getSystemService(Context.WIFI_SERVICE); WifiInfo wi = wifiManager.getConnectionInfo(); if (wi != null && lastWifiInfo != null && lastWifiInfo.getBSSID().equals(wi.getBSSID()) && lastWifiInfo.getSSID().equals(wi.getSSID()) && lastWifiInfo.getNetworkId() == wi.getNetworkId()) { Log.w(TAG, "Same Wifi, do not NetworkChanged"); return false; } lastWifiInfo = wi; } else if (lastActiveNetworkInfo != null && lastActiveNetworkInfo.getExtraInfo() != null && activeNetInfo.getExtraInfo() != null && lastActiveNetworkInfo.getExtraInfo().equals(activeNetInfo.getExtraInfo()) && lastActiveNetworkInfo.getSubtype() == activeNetInfo.getSubtype() && lastActiveNetworkInfo.getType() == activeNetInfo.getType()) { return false; } else if (lastActiveNetworkInfo != null && lastActiveNetworkInfo.getExtraInfo() == null && activeNetInfo.getExtraInfo() == null && lastActiveNetworkInfo.getSubtype() == activeNetInfo.getSubtype() && lastActiveNetworkInfo.getType() == activeNetInfo.getType()) { Log.w(TAG, "Same Network, do not NetworkChanged"); return false; } if (!isWifi) { } lastActiveNetworkInfo = activeNetInfo; return true; } } }
能够看到,定义了一个静态的广播接收器,这个ConnectionReceiver直接在上层的wrapper中的manifest里被定义了。注释也写的很清楚,就是为了监听网络切换。直接在上层的MessageHandler里面会发起广播来通道到这里。
能够看到,这里并不关心intent的内容,而是经过ConnectivityManager获取网络信息,并检查网络状况。若是网络状态发生改变,经过BaseEvent.onNetworkChange通知native层。
以此为切入点,咱们进入mars的native层:
/mars-master/mars/baseevent/jni/com_tencent_mars_BaseEvent.cc
JNIEXPORT void JNICALL Java_com_tencent_mars_BaseEvent_onNetworkChange (JNIEnv *, jclass) { mars::baseevent::OnNetworkChange(); }
最后会到达这里:
/mars-master/mars/baseevent/src/baseprjevent.cc
boost::signals2::signal<void ()>& GetSignalOnNetworkChange() { static boost::signals2::signal<void ()> SignalOnNetworkChange; return SignalOnNetworkChange; }
利用了boost这个c++库的signals2信号槽的机制,有点像qt下的信号槽。发起了一个信号,那么这个信号在哪里响应的呢?
在stn_locic.cc这个文件中:
/mars-master/mars/stn/stn_logic.cc
static void __initbind_baseprjevent() { #ifdef ANDROID mars::baseevent::addLoadModule(kLibName); #endif GetSignalOnCreate().connect(&onCreate); GetSignalOnDestroy().connect(&onDestroy); GetSignalOnSingalCrash().connect(&onSingalCrash); GetSignalOnExceptionCrash().connect(&onExceptionCrash); GetSignalOnNetworkChange().connect(&onNetworkChange); #ifndef XLOGGER_TAG #error "not define XLOGGER_TAG" #endif GetSignalOnNetworkDataChange().connect(&OnNetworkDataChange); } BOOT_RUN_STARTUP(__initbind_baseprjevent);
看到吧,这里调用connect链接了onNetworkChange这个函数。咱们先看下哪里调用这个BOOT_RUN_STARTUP初始化了:
/mars-master/mars/comm/bootrun.h
#define BOOT_RUN_STARTUP(func) VARIABLE_IS_NOT_USED static int __anonymous_run_variable_startup_##func = boot_run_atstartup(func)
看到了吧,是个静态变量,这里利用了c++的技巧,全局静态变量会在一开始就初始化,所以这里实际上在这里调用了__initbind_baseprjevent这个静态方法,而后实现了一开始信号与槽的链接。
那么好,既然初始化和触发的过程都有了,剩下的就是看看响应的部分干了什么吧:
static void onNetworkChange() { #ifdef __APPLE__ FlushReachability(); #endif #ifdef ANDROID g_NetInfo = 0; ScopedLock lock(g_net_mutex); g_wifi_info.ssid.clear(); g_wifi_info.bssid.clear(); g_sim_info.isp_code.clear(); g_sim_info.isp_name.clear(); g_apn_info.nettype = kNoNet -1; g_apn_info.sub_nettype = 0; g_apn_info.extra_info.clear(); lock.unlock(); #endif STN_WEAK_CALL(OnNetworkChange()); }
加锁,清除全局的有关网络部分的一些保留的信息,由于网络状态发生了改变,这些都等于无效了。而后执行了一个唤醒的调用,经过宏STN_WEAK_CALL,具体的先不看吧,调用的是函数OnNetworkChange:
/mars-master/mars/stn/src/timing_sync.cc
void TimingSync::OnNetworkChange() { if (alarm_.IsWaiting()) { alarm_.Cancel(); alarm_.Start(GetAlarmTime(active_logic_.IsActive())); } }
这里又引入了alarm这个东西,看起来是个警报器。从新启动了他,给定了一个时间。再往下看的话,有些东西看不太明白了,这里存疑吧。不过刚才这部分的大致流程仍是能够概括出来的。
1.上层的静态广播接收器接收广播事件;
2.触发后,经过BaseEvent来通知底层native层,这里的BaseEvent实际上是承担了java层与c层之间的通信;
3.进入到c层后,也有baseevent,不过这里的是经过信号槽的方式触发响应函数处理;
咱们再继续看下这些BaseEvent所触发的各个函数:
static void onCreate() { #if !UWP && !defined(WIN32) signal(SIGPIPE, SIG_IGN); #endif xinfo2(TSF"stn oncreate"); SINGLETON_STRONG(ActiveLogic); NetCore::Singleton::Instance(); }
其实是new出了这个NetCore单例。这其中使用了大量的宏,配合智能指针处理,详细过程就不在这里叙述了。
下面咱们结合上层的处理来分析下一个任务的执行流程。还记得上层最后的send调用会最终调用StnLogic.startTask(_task)。那么这个就直接进入了StnLogic:
/mars-master/mars/libraries/mars_android_sdk/src/main/java/com/tencent/mars/stn/StnLogic.java
//async call public static native void startTask(final Task task);
这里已经注明是个异步调用,并且是个native的函数,好的,进入c层,咱们节选一部分代码:
/mars-master/mars/stn/jni/com_tencent_mars_stn_StnLogic_Java2C.cc
DEFINE_FIND_STATIC_METHOD(KJava2C_startTask, KNetJava2C, "startTask", "(Lcom/tencent/mars/stn/StnLogic$Task;)V") JNIEXPORT void JNICALL Java_com_tencent_mars_stn_StnLogic_startTask (JNIEnv *_env, jclass, jobject _task) { xverbose_function(); //get the field value of the netcmd jint taskid = JNU_GetField(_env, _task, "taskID", "I").i; jint cmdid = JNU_GetField(_env, _task, "cmdID", "I").i; jint channel_select = JNU_GetField(_env, _task, "channelSelect", "I").i; jobject hostlist = JNU_GetField(_env, _task, "shortLinkHostList", "Ljava/util/ArrayList;").l; jstring cgi = (jstring)JNU_GetField(_env, _task, "cgi", "Ljava/lang/String;").l; jboolean send_only = JNU_GetField(_env, _task, "sendOnly", "Z").z; jboolean need_authed = JNU_GetField(_env, _task, "needAuthed", "Z").z; jboolean limit_flow = JNU_GetField(_env, _task, "limitFlow", "Z").z; jboolean limit_frequency = JNU_GetField(_env, _task, "limitFrequency", "Z").z; jint channel_strategy = JNU_GetField(_env, _task, "channelStrategy", "I").i; jboolean network_status_sensitive = JNU_GetField(_env, _task, "networkStatusSensitive", "Z").z; jint priority = JNU_GetField(_env, _task, "priority", "I").i; jint retrycount = JNU_GetField(_env, _task, "retryCount", "I").i; jint server_process_cost = JNU_GetField(_env, _task, "serverProcessCost", "I").i; jint total_timetout = JNU_GetField(_env, _task, "totalTimeout", "I").i; jstring report_arg = (jstring)JNU_GetField(_env, _task, "reportArg", "Ljava/lang/String;").l; //init struct Task struct Task task; task.taskid = taskid; task.cmdid = cmdid; task.channel_select = channel_select; task.send_only = send_only; task.need_authed = need_authed; task.limit_flow = limit_flow; task.limit_frequency = limit_frequency; task.channel_strategy = channel_strategy; task.network_status_sensitive = network_status_sensitive; task.priority = priority; task.retry_count = retrycount; task.server_process_cost = server_process_cost; task.total_timetout = total_timetout; if (NULL != report_arg) { task.report_arg = ScopedJstring(_env, report_arg).GetChar(); } if (NULL != hostlist) { jclass cls_arraylist = _env->GetObjectClass(hostlist); //method in class ArrayList jmethodID arraylist_get = _env->GetMethodID(cls_arraylist,"get","(I)Ljava/lang/Object;"); jmethodID arraylist_size = _env->GetMethodID(cls_arraylist,"size","()I"); jint len = _env->CallIntMethod(hostlist, arraylist_size); for(int i = 0; i < len; i++){ jstring host = (jstring)_env->CallObjectMethod(hostlist, arraylist_get, i); if (NULL != host) { task.shortlink_host_list.push_back(ScopedJstring(_env, host).GetChar()); _env->DeleteLocalRef(host); } } _env->DeleteLocalRef(hostlist); } if (NULL != cgi) { task.cgi = ScopedJstring(_env, cgi).GetChar(); _env->DeleteLocalRef(cgi); } StartTask(task); }
所有都是在作Task的转换,从StnLogic的Task转换成为底层native的Task。依次抓取成员变量字段,并复制到新Task中。而后调用的是StartTask。会直接走到NetCore中去:
/mars-master/mars/stn/src/net_core.cc
void NetCore::StartTask(const Task& _task) { ASYNC_BLOCK_START xgroup2_define(group); xinfo2(TSF"task start long short taskid:%0, cmdid:%1, need_authed:%2, cgi:%3, channel_select:%4, limit_flow:%5, ", _task.taskid, _task.cmdid, _task.need_authed, _task.cgi.c_str(), _task.channel_select, _task.limit_flow) >> group; xinfo2(TSF"host:%_, send_only:%_, cmdid:%_, server_process_cost:%_, retrycount:%_, channel_strategy:%_, ", _task.shortlink_host_list.empty()?"":_task.shortlink_host_list.front(), _task.send_only, _task.cmdid, _task.server_process_cost, _task.retry_count, _task.channel_strategy) >> group; xinfo2(TSF" total_timetout:%_, network_status_sensitive:%_, priority:%_, report_arg:%_", _task.total_timetout, _task.network_status_sensitive, _task.priority, _task.report_arg) >> group; Task task = _task; if (!__ValidAndInitDefault(task, group)) { OnTaskEnd(task.taskid, task.user_context, kEctLocal, kEctLocalTaskParam); return; } if (task_process_hook_) { task_process_hook_(task); } if (0 == task.channel_select) { xerror2(TSF"error channelType (%_, %_), ", kEctLocal, kEctLocalChannelSelect) >> group; OnTaskEnd(task.taskid, task.user_context, kEctLocal, kEctLocalChannelSelect); return; } if (task.network_status_sensitive && kNoNet ==::getNetInfo() #ifdef USE_LONG_LINK && LongLink::kConnected != longlink_task_manager_->LongLinkChannel().ConnectStatus() #endif ) { xerror2(TSF"error no net (%_, %_), ", kEctLocal, kEctLocalNoNet) >> group; OnTaskEnd(task.taskid, task.user_context, kEctLocal, kEctLocalNoNet); return; } bool start_ok = false; #ifdef USE_LONG_LINK if (LongLink::kConnected != longlink_task_manager_->LongLinkChannel().ConnectStatus() && (Task::kChannelLong & task.channel_select) && SINGLETON_STRONG(ActiveLogic)->IsForeground() && (15 * 60 * 1000 >= gettickcount() - SINGLETON_STRONG(ActiveLogic)->LastForegroundChangeTime())) longlink_task_manager_->getLongLinkConnectMonitor().MakeSureConnected(); #endif xgroup2() << group; switch (task.channel_select) { case Task::kChannelBoth: { #ifdef USE_LONG_LINK bool bUseLongLink = LongLink::kConnected == longlink_task_manager_->LongLinkChannel().ConnectStatus(); if (bUseLongLink && task.channel_strategy == Task::kChannelFastStrategy) { xinfo2(TSF"long link task count:%0, ", longlink_task_manager_->GetTaskCount()); bUseLongLink = bUseLongLink && (longlink_task_manager_->GetTaskCount() <= kFastSendUseLonglinkTaskCntLimit); } if (bUseLongLink) start_ok = longlink_task_manager_->StartTask(task); else #endif start_ok = shortlink_task_manager_->StartTask(task); } break; #ifdef USE_LONG_LINK case Task::kChannelLong: start_ok = longlink_task_manager_->StartTask(task); break; #endif case Task::kChannelShort: start_ok = shortlink_task_manager_->StartTask(task); break; default: xassert2(false); break; } if (!start_ok) { xerror2(TSF"taskid:%_, error starttask (%_, %_)", task.taskid, kEctLocal, kEctLocalStartTaskFail); OnTaskEnd(task.taskid, task.user_context, kEctLocal, kEctLocalStartTaskFail); } else { #ifdef USE_LONG_LINK zombie_task_manager_->OnNetCoreStartTask(); #endif } ASYNC_BLOCK_END }
1.一堆错误检查,有错误随时准备执行OnTaskEnd并返回;
2.一个switch case,用来检查任务的执行通道类型,分为长链接+短链接、长链接、短链接。
3.最后若是一切正常,而且是长链接,还要走一个zombie_task_manager_->OnNetCoreStartTask(),其实内部就是记录下开始时间。字面上理解是僵尸任务管理,我理解的是对实际执行时间超长的任务的跟踪,若是发现已经脱离并超时,准备结束和回收。
这里我特地看了下,是在net_core.cc中,长链接错误,断开重连或者网络情况发生改变,都会触发ZombieTaskManager.__StartTask,这个东西会在内部检查每一个任务的时长,这些任务是经过SaveTask保存下来的,若是时长超过了会走fun_callback_不然再次尝试执行任务fun_start_task_。这里采用的是boost::bind方式,对应的函数实体是NetCore::__CallBack和NetCore::StartTask。以前在_StartTask中若是超时了,会调用到这个callback,其中会保留task,不然才走NetCore::StartTask。这里我感受超时的任务并非彻底释放回收,而是等待网络情况发生改变的时候再次尝试执行。此处属于粗略查看,并不能确定,不过暂时没看到释听任务的步骤。这里已经偏离了主线,暂时放下。
4.每一个case其实是根据传输通道类型的不一样,执行的不一样的任务管理器的StartTask。
咱们以长链接为例,看下具体的执行过程:
/mars-master/mars/stn/src/longlink_task_manager.cc
bool LongLinkTaskManager::StartTask(const Task& _task) { xverbose_function(); xdebug2(TSF"taskid=%0", _task.taskid); TaskProfile task(_task); task.link_type = Task::kChannelLong; lst_cmd_.push_back(task); lst_cmd_.sort(__CompareTask); __RunLoop(); return true; }
1.设置传输类型;
2.添加任务到队列中,并从新排序队列;
3.执行__RunLoop;
看看__RunLoop:
void LongLinkTaskManager::__RunLoop() { if (lst_cmd_.empty()) { #ifdef ANDROID /*cancel the last wakeuplock*/ wakeup_lock_->Lock(500); #endif return; } __RunOnTimeout(); __RunOnStartTask(); if (!lst_cmd_.empty()) { #ifdef ANDROID wakeup_lock_->Lock(30 * 1000); #endif MessageQueue::FasterMessage(asyncreg_.Get(), MessageQueue::Message((MessageQueue::MessageTitle_t)this, boost::bind(&LongLinkTaskManager::__RunLoop, this)), MessageQueue::MessageTiming(1000)); } else { #ifdef ANDROID /*cancel the last wakeuplock*/ wakeup_lock_->Lock(500); #endif } }
1.__RunOnTimeout超时;
2.__RunOnStartTask执行任务;
3.执行MessageQueue::FasterMessage;
依次看下:
void LongLinkTaskManager::__RunOnTimeout() { std::list<TaskProfile>::iterator first = lst_cmd_.begin(); std::list<TaskProfile>::iterator last = lst_cmd_.end(); uint64_t cur_time = ::gettickcount(); int socket_timeout_code = 0; bool istasktimeout = false; while (first != last) { std::list<TaskProfile>::iterator next = first; ++next; if (first->running_id && 0 < first->transfer_profile.start_send_time) { if (0 == first->transfer_profile.last_receive_pkg_time && cur_time - first->transfer_profile.start_send_time >= first->transfer_profile.first_pkg_timeout) { xerror2(TSF"task first-pkg timeout taskid:%_, nStartSendTime=%_, nfirstpkgtimeout=%_", first->task.taskid, first->transfer_profile.start_send_time / 1000, first->transfer_profile.first_pkg_timeout / 1000); socket_timeout_code = kEctLongFirstPkgTimeout; __SetLastFailedStatus(first); } if (0 < first->transfer_profile.last_receive_pkg_time && cur_time - first->transfer_profile.last_receive_pkg_time >= ((kMobile != getNetInfo()) ? kWifiPackageInterval : kGPRSPackageInterval)) { xerror2(TSF"task pkg-pkg timeout, taskid:%_, nLastRecvTime=%_, pkg-pkg timeout=%_", first->task.taskid, first->transfer_profile.last_receive_pkg_time / 1000, ((kMobile != getNetInfo()) ? kWifiPackageInterval : kGPRSPackageInterval) / 1000); socket_timeout_code = kEctLongPkgPkgTimeout; } } if (first->running_id && 0 < first->transfer_profile.start_send_time && cur_time - first->transfer_profile.start_send_time >= first->transfer_profile.read_write_timeout) { xerror2(TSF"task read-write timeout, taskid:%_, , nStartSendTime=%_, nReadWriteTimeOut=%_", first->task.taskid, first->transfer_profile.start_send_time / 1000, first->transfer_profile.read_write_timeout / 1000); socket_timeout_code = kEctLongReadWriteTimeout; } if (cur_time - first->start_task_time >= first->task_timeout) { __SingleRespHandle(first, kEctLocal, kEctLocalTaskTimeout, kTaskFailHandleTaskTimeout, longlink_->Profile()); istasktimeout = true; } first = next; } if (0 != socket_timeout_code) { dynamic_timeout_.CgiTaskStatistic("", kDynTimeTaskFailedPkgLen, 0); __BatchErrorRespHandle(kEctNetMsgXP, socket_timeout_code, kTaskFailHandleDefault, 0, longlink_->Profile()); xassert2(fun_notify_network_err_); fun_notify_network_err_(__LINE__, kEctNetMsgXP, socket_timeout_code, longlink_->Profile().ip, longlink_->Profile().port); } else if (istasktimeout) { __BatchErrorRespHandle(kEctNetMsgXP, kEctLongTaskTimeout, kTaskFailHandleDefault, 0, longlink_->Profile()); // xassert2(funNotifyNetworkError); // funNotifyNetworkError(__LINE__, ectNetMsgXP, ectNetMsgXP_TaskTimeout, longlink_->IP(), longlink_->Port()); } }
循环检查cmd容器每一个项目,检查是网络socket超时错误仍是任务超时。这个暂时不做为重点查看。
看下__RunOnStartTask:
void LongLinkTaskManager::__RunOnStartTask() { std::list<TaskProfile>::iterator first = lst_cmd_.begin(); std::list<TaskProfile>::iterator last = lst_cmd_.end(); bool ismakesureauthruned = false; bool ismakesureauthsuccess = false; uint64_t curtime = ::gettickcount(); bool canretry = curtime - lastbatcherrortime_ >= retry_interval_; bool canprint = true; int sent_count = 0; while (first != last) { std::list<TaskProfile>::iterator next = first; ++next; // 跳过正在运行的任务 if (first->running_id) { ++sent_count; first = next; continue; } //重试间隔, 不影响第一次发送的任务 if (first->task.retry_count > first->remain_retry_count && !canretry) { xdebug2_if(canprint, TSF"retry interval:%0, curtime:%1, lastbatcherrortime_:%2, curtime-m_lastbatcherrortime:%3", retry_interval_, curtime, lastbatcherrortime_, curtime - lastbatcherrortime_); canprint = false; first = next; continue; } // make sure login if (first->task.need_authed) { if (!ismakesureauthruned) { ismakesureauthruned = true; ismakesureauthsuccess = MakesureAuthed(); } if (!ismakesureauthsuccess) { xinfo2_if(curtime % 3 == 0, TSF"makeSureAuth retsult=%0", ismakesureauthsuccess); first = next; continue; } } AutoBuffer bufreq; int error_code = 0; if (!first->antiavalanche_checked) { if (!Req2Buf(first->task.taskid, first->task.user_context, bufreq, error_code, Task::kChannelLong)) { __SingleRespHandle(first, kEctEnDecode, error_code, kTaskFailHandleTaskEnd, longlink_->Profile()); first = next; continue; } // 雪崩检测 xassert2(fun_anti_avalanche_check_); if (!fun_anti_avalanche_check_(first->task, bufreq.Ptr(), (int)bufreq.Length())) { __SingleRespHandle(first, kEctLocal, kEctLocalAntiAvalanche, kTaskFailHandleTaskEnd, longlink_->Profile()); first = next; continue; } first->antiavalanche_checked = true; } if (!longlinkconnectmon_->MakeSureConnected()) { break; } if (0 == bufreq.Length()) { if (!Req2Buf(first->task.taskid, first->task.user_context, bufreq, error_code, Task::kChannelLong)) { __SingleRespHandle(first, kEctEnDecode, error_code, kTaskFailHandleTaskEnd, longlink_->Profile()); first = next; continue; } // 雪崩检测 xassert2(fun_anti_avalanche_check_); if (!first->antiavalanche_checked && !fun_anti_avalanche_check_(first->task, bufreq.Ptr(), (int)bufreq.Length())) { __SingleRespHandle(first, kEctLocal, kEctLocalAntiAvalanche, kTaskFailHandleTaskEnd, longlink_->Profile()); first = next; continue; } } first->transfer_profile.loop_start_task_time = ::gettickcount(); first->transfer_profile.first_pkg_timeout = __FirstPkgTimeout(first->task.server_process_cost, bufreq.Length(), sent_count, dynamic_timeout_.GetStatus()); first->current_dyntime_status = (first->task.server_process_cost <= 0) ? dynamic_timeout_.GetStatus() : kEValuating; first->transfer_profile.read_write_timeout = __ReadWriteTimeout(first->transfer_profile.first_pkg_timeout); first->transfer_profile.send_data_size = bufreq.Length(); first->running_id = longlink_->Send((const unsigned char*) bufreq.Ptr(), (unsigned int)bufreq.Length(), first->task.cmdid, first->task.taskid, first->task.send_only ? "":first->task.cgi); if (!first->running_id) { xwarn2(TSF"task add into longlink readwrite fail cgi:%_, cmdid:%_, taskid:%_", first->task.cgi, first->task.cmdid, first->task.taskid); first = next; continue; } xinfo2(TSF"task add into longlink readwrite suc cgi:%_, cmdid:%_, taskid:%_, size:%_, timeout(firstpkg:%_, rw:%_, task:%_), retry:%_", first->task.cgi, first->task.cmdid, first->task.taskid, first->transfer_profile.send_data_size, first->transfer_profile.first_pkg_timeout / 1000, first->transfer_profile.read_write_timeout / 1000, first->task_timeout / 1000, first->remain_retry_count); if (first->task.send_only) { __SingleRespHandle(first, kEctOK, 0, kTaskFailHandleNoError, longlink_->Profile()); } ++sent_count; first = next; } }
仍然是循环cmd列表,里面作的事情:
1.跳过正在执行的任务;
2.若是任务的retry次数超过边界,则跳过这个任务,继续下一个;
3.确认登陆验证,若是是须要验证的而且还未验证,则执行MakesureAuthed。这个内部也是最终调用到了上层java层的StnLogic的ICallBack.makesureAuthed方法上,由上层在此处能够插入一个验证机制,注意这里是个同步的过程;
4.雪崩检查;
5.执行longlink_->Send真正的发送;
来看看雪崩检测吧。这里的雪崩指的是客户端反复retry太多引起的很短的时间间隔内大量的数据重发致使的负载过大的问题。通常状况下出如今网络不稳定或者服务器出现处理问题的时候,严重的状况下会拖慢客户端的效能,而且致使服务器负载瘫痪,有点像dos攻击这类的效果。
是经过函数fun_anti_avalanche_check_检查的,其实是AntiAvalanche::Check。
/mars-master/mars/stn/src/anti_avalanche.cc
bool AntiAvalanche::Check(const Task& _task, const void* _buffer, int _len) { xverbose_function(); unsigned int span = 0; if (!frequency_limit_->Check(_task, _buffer, _len, span)){ ReportTaskLimited(kFrequencyLimit, _task, span); return false; } if (kMobile == getNetInfo() && !flow_limit_->Check(_task, _buffer, _len)) { ReportTaskLimited(kFlowLimit, _task, (unsigned int&)_len); return false; } return true; }
这里有2种状况返回了false,而false正是调用者判断是否执行continue的依据(执行了continue便是暂时不进行这个任务的处理转而进行下一次循环),这两种状况分别是调用了FrequencyLimit::Check和FlowLimit::Check。先看前者:
/mars-master/mars/stn/src/frequency_limit.cc
bool FrequencyLimit::Check(const mars::stn::Task& _task, const void* _buffer, int _len, unsigned int& _span) { xverbose_function(); if (!_task.limit_frequency) return true; // 计算时间间隔,当前时间与上次记录清除时间的时间差 unsigned long time_cur = ::gettickcount(); xassert2(time_cur >= itime_record_clear_); unsigned long interval = time_cur - itime_record_clear_; // 若是这个时间间隔大于等于60*60*1000(1分钟),执行清除全部记录,并记录当前时间为最后清除记录的时间 if (RUN_CLEAR_RECORDS_INTERVAL_MINUTE <= interval) { xdebug2(TSF"__ClearRecord interval=%0, timeCur=%1, itimeRecordClear=%2", interval, time_cur, itime_record_clear_); itime_record_clear_ = time_cur; __ClearRecord(); } // 计算当前buffer的hash值,在当前记录集中查找是否有相同的记录存在 unsigned long hash = ::adler32(0, (const unsigned char*)_buffer, _len); int find_index = __LocateIndex(hash); if (0 <= find_index) { // 有相同记录存在 _span = __GetLastUpdateTillNow(find_index); __UpdateRecord(find_index); // 检测记录的次数是否超出边界105,若是是返回false if (!__CheckRecord(find_index)) { xerror2(TSF"Anti-Avalanche had Catch Task, Task Info: ptr=%0, cmdid=%1, need_authed=%2, cgi:%3, channel_select=%4, limit_flow=%5", &_task, _task.cmdid, _task.need_authed, _task.cgi, _task.channel_select, _task.limit_flow); xerror2(TSF"apBuffer Len=%0, Hash=%1, Count=%2, timeLastUpdate=%3", _len, iarr_record_[find_index].hash_, iarr_record_[find_index].count_, iarr_record_[find_index].time_last_update_); xassert2(false); return false; } } else { // 没有相同记录存在,插入当前这条记录 xdebug2(TSF"InsertRecord Task Info: ptr=%0, cmdid=%1, need_authed=%2, cgi:%3, channel_select=%4, limit_flow=%5", &_task, _task.cmdid, _task.need_authed, _task.cgi, _task.channel_select, _task.limit_flow); __InsertRecord(hash); } return true; }
1.计算一个时间差,是从上次清楚全部记录开始到当前时间点的,而后判断时间差是否超出1分钟的边界,若是是,清楚全部记录集,并将当前时间做为最后清除时间;
2.根据传递进来的buffer计算hash值,并调用__LocateIndex判断当前记录集中是否有相同hash值的记录存在;
3.根据__LocateIndex的返回值判断(-1没有相同的存在,>=0有相同的存在)执行不一样的分支;
4.有相同的存在,执行__CheckRecord,检查这条记录的count_是否<=105,若是超出,表示到达上限,返回false;
5.没有相同的记录存在,插入该条记录到记录集中;
这里稍微总结一下,能够看出,彻底是经过时间和次数来肯定是否雪崩的,时间用来确保必定的时间间隔内的检查,不会太频繁也不会太超长,1分钟比较合适,过了1分钟清除一次再从新来;次数在__UpdateRecord时候增长,用来确保一段时间内,只要执行的雪崩检查越多,次数的值就越大,直到超出后认为触发了雪崩效应,返回false。看起来怎么样,比较清晰和明确,并且这部分独立的,耦合性也不高,写的挺好。
顺便说下,__InsertRecord插入并非直接插入就完了,里面会执行一个检查,若是当前的记录集大小已经超过了最大额定大小(30),会在每次都找到一个时间最小的,也就是最先的,删除他,给新的一个记录腾个地。
void FrequencyLimit::__InsertRecord(unsigned long _hash) { if (MAX_RECORD_COUNT < iarr_record_.size()) { xassert2(false); return; } STAvalancheRecord temp; temp.count_ = 1; temp.hash_ = _hash; temp.time_last_update_ = ::gettickcount(); // 若是超出边界,删除时间最先的那一条记录 if (MAX_RECORD_COUNT == iarr_record_.size()) { unsigned int del_index = 0; for (unsigned int i = 1; i < iarr_record_.size(); i++) { if (iarr_record_[del_index].time_last_update_ > iarr_record_[i].time_last_update_) { del_index = i; } } std::vector<STAvalancheRecord>::iterator it = iarr_record_.begin(); it += del_index; iarr_record_.erase(it); } iarr_record_.push_back(temp); }
ok,上面的只是雪崩检查的一部分,还有另一部分在FlowLimit::Check:
/mars-master/mars/stn/src/flow_limit.cc
bool FlowLimit::Check(const mars::stn::Task& _task, const void* _buffer, int _len) { xverbose_function(); if (!_task.limit_flow) { return true; } __FlashCurVol(); if (cur_funnel_vol_ + _len > kMaxVol) { xerror2(TSF"Task Info: ptr=%_, cmdid=%_, need_authed=%_, cgi:%_, channel_select=%_, limit_flow=%_, cur_funnel_vol_(%_)+_len(%_)=%_,MAX_VOL:%_ ", &_task, _task.cmdid, _task.need_authed, _task.cgi, _task.channel_select, _task.limit_flow, cur_funnel_vol_ + _len, cur_funnel_vol_, _len, cur_funnel_vol_ + _len, kMaxVol); return false; } cur_funnel_vol_ += _len; return true; }
先回顾下上面的AntiAvalanche::Check调用FlowLimit::Check的时候,还有一个条件,就是kMobile == getNetInfo(),也就是非wifi的移动网络模式下开启这个检测,再回来看这里,单独看cur_funnel_vol_ + _len > kMaxVol这句话,感受是一个数据量上的检测,若是大小超出2 * 1024这个边界会认为触发雪崩,好吧,关键看下__FlashCurVol:
void FlowLimit::__FlashCurVol() { uint64_t timeCur = ::gettickcount(); xassert2(timeCur >= time_lastflow_computer_, TSF"%_, %_", timeCur, time_lastflow_computer_); uint64_t interval = (timeCur - time_lastflow_computer_) / 1000; xdebug2(TSF"iCurFunnelVol=%0, iFunnelSpeed=%1, interval=%2", cur_funnel_vol_, funnel_speed_, interval); cur_funnel_vol_ -= interval * funnel_speed_; cur_funnel_vol_ = std::max(0, cur_funnel_vol_); xdebug2(TSF"iCurFunnelVol=%0", cur_funnel_vol_); time_lastflow_computer_ = timeCur; }
开始仍然是时间差的计算,当前时间和最后的流量时间作时间差并计算出秒为单位的差值,而后而后用一个增量单位因子funnel_speed_乘以时间差后,用当前的一个值减去他,这个值能够认为是一个估值6 1024 1024,而后若是这个值要是仍然大于0,则保留,不然置为0。后面回到Check里面,进行了这个值cur_funnel_vol_ + _len > kMaxVol的判断。那么怎么理解呢?个人理解是在移动网络状况下,在作一个假设的发送数据量的追踪,来看在这段时间差里若是按照假设的应当走多少流量,而后这个流量值再与一个假设的总值相减,得到的是还可以使用多少流量,若是这个值加上当前要传输的Len超出了极限值,那么认为是过载了,触发雪崩,暂时不作处理,continue。那么这个continue的结果是此次任务的数据量大,先不发送,先走后面的一个任务,若是数据量少,能够发送。
总结一下吧,智能说这里的雪崩检查仍是比较有技术含量的,煞费苦心的作了2个检查,一个是单体任务的发送次数限制,一个是流量的限制,只要有一个超出限制,认为是雪崩,执行跳过当前任务,执行下一个。这里看起来仍是比较费劲一些,须要猜想用途的真正含义。这里的检查涵盖的比较全面,能够看到微信为了解决移动网络下的网络传输雪崩,下了一些功夫,挺不错的!
此文有点长,咱们把雪崩检查的后续工做放大下一文去分析吧。