在拨云见日---android异步消息机制源码分析(一)(http://my.oschina.net/u/1155515/blog/378460)中,咱们了解了Java层异步消息机制的基本流程,可能细心的同窗会发现java层中有不少native调用,其实java层仅仅是一个壳子,具体的实现全在native层,经过本篇文章,让咱们继续抽丝剥茧,一步步揭开Android异步消息的面纱,分析很差之处,还请熟悉的童鞋指教java
本文从实际使用角度出发,共分为2个部分android
一、从子线程添加消息到消息线程时,从Java层到native的处理流程异步
二、消息线程消费消息的处理流程函数
1、从子线程添加消息到消息线程时,从Java层到native的处理流程oop
在子线程中,咱们能够经过handler.sendMessage或Handler.postXXX(如post(Runable r)或postDelayed(Runnable r, long delayMillis))向消息线程发送消息源码分析
而最终会调用到Java层MessageQueue.enqueueMessage把消息放入消息线程的MessageQueue(源码路径:android.os.MessageQueue)post
boolean enqueueMessage(Message msg, long when) { if (msg.isInUse()) { throw new AndroidRuntimeException(msg + " This message is already in use."); } if (msg.target == null) { throw new AndroidRuntimeException("Message must have a target."); } synchronized (this) { if (mQuitting) { RuntimeException e = new RuntimeException( msg.target + " sending message to a Handler on a dead thread"); Log.w("MessageQueue", e.getMessage(), e); return false; } msg.when = when; Message p = mMessages; boolean needWake; //判断入队消息是否须要当即处理 if (p == null || when == 0 || when < p.when) { // New head, wake up the event queue if blocked. msg.next = p; mMessages = msg; needWake = mBlocked;//若是消息循环线程处于阻塞中,则须要唤醒消息线程 } else { //入队消息不须要当即处理,则根据消息处理时间插入到链表中合适位置 ......... Message prev; for (;;) { prev = p; p = p.next; if (p == null || when < p.when) { break; } ............. } msg.next = p; // invariant: p == prev.next prev.next = msg; } // We can assume mPtr != 0 because mQuitting is false. if (needWake) { //唤醒 nativeWake(mPtr); } } return true; }
经过上面代码,当入队消息须要当即处理而且消息线程处于阻塞时,会调用native函数nativeWake唤醒消息线程来处理消息ui
经过JNI定义,让咱们看看nativeWake对应的native函数是什么this
代码路径: frameworks\base\core\jni\android_os_MessageQueue.cppspa
NativeMessageQueue::NativeMessageQueue() : mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) { mLooper = Looper::getForThread(); if (mLooper == NULL) { mLooper = new Looper(false); Looper::setForThread(mLooper); } } void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) { ....... mLooper->pollOnce(timeoutMillis); ....... } void NativeMessageQueue::wake() { mLooper->wake(); } static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, jlong ptr, jint timeoutMillis) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr); nativeMessageQueue->pollOnce(env, obj, timeoutMillis); } static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr); nativeMessageQueue->wake(); } static const JNINativeMethod gMessageQueueMethods[] = { /* name, signature, funcPtr */ { "nativeInit", "()J", (void*)android_os_MessageQueue_nativeInit }, { "nativeDestroy", "(J)V", (void*)android_os_MessageQueue_nativeDestroy }, { "nativePollOnce", "(JI)V", (void*)android_os_MessageQueue_nativePollOnce }, { "nativeWake", "(J)V", (void*)android_os_MessageQueue_nativeWake }, { "nativeIsPolling", "(J)Z", (void*)android_os_MessageQueue_nativeIsPolling }, { "nativeSetFileDescriptorEvents", "(JII)V", (void*)android_os_MessageQueue_nativeSetFileDescriptorEvents }, }; int register_android_os_MessageQueue(JNIEnv* env) { int res = RegisterMethodsOrDie(env, "android/os/MessageQueue", gMessageQueueMethods, NELEM(gMessageQueueMethods)); jclass clazz = FindClassOrDie(env, "android/os/MessageQueue"); gMessageQueueClassInfo.mPtr = GetFieldIDOrDie(env, clazz, "mPtr", "J"); gMessageQueueClassInfo.dispatchEvents = GetMethodIDOrDie(env, clazz, "dispatchEvents", "(II)I"); return res; }
调用关系链以下:
java层nativeWake->nativeMessageQueue.wake()->Looper.wake()
让咱们经过源码一探Looper.wake()
代码路径: frameworks\native\jb-dev\libs\utils\Looper.cpp
void Looper::wake() { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ wake", this); #endif ssize_t nWrite; do { nWrite = write(mWakeWritePipeFd, "W", 1); } while (nWrite == -1 && errno == EINTR); if (nWrite != 1) { if (errno != EAGAIN) { ALOGW("Could not write wake signal, errno=%d", errno); } } }
mWakeWritePipeFd是个什么鬼?
为何会向mWakeWritePipeFd写入一个字符?
别急,带着这些疑问咱们继续日后面看,看了后面,前面的问题也就迎刃而解
如今咱们简单的总结一下消息发送的流程:
一、在Java层经过Handler对象发送消息后,消息被放入消息线程的MessageQueue
二、消息入队后,会判断是否须要当即处理消息
三、若是须要当即处理消息且消息线程处于阻塞中,则唤醒消息线程
2、消息线程消费消息的处理流程
上面咱们了解从子线程发送消息的流程,那么发送了消息后,消息线程是如何消费消息?
继续咱们的脚步,让咱们来一探究竟
经过前一篇文章(http://my.oschina.net/u/1155515/blog/378460)咱们了解到消息线程经过调用Java层Looper.loop()进入消息循环,在消息循环中,又经过调用MessageQueue.next()不断的获取消息或者没有消息时阻塞
Message next() { int pendingIdleHandlerCount = -1; // -1 only during first iteration int nextPollTimeoutMillis = 0; for (;;) { ............... //先调用native方法获阻塞到超时(超时时间由nextPollTimeoutMillis指定)或者被主动唤醒 nativePollOnce(mPtr, nextPollTimeoutMillis); synchronized (this) { // Try to retrieve the next message. Return if found. final long now = SystemClock.uptimeMillis(); Message prevMsg = null; Message msg = mMessages; ................... //判断是有新消息到达仍是超时 if (msg != null) { //判断是否须要当即处理消息 if (now < msg.when) { // Next message is not ready. Set a timeout to wake up when it is ready. nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE); } else { // Got a message. // //消息须要当即处理,则返回消息 mBlocked = false; if (prevMsg != null) { prevMsg.next = msg.next; } else { mMessages = msg.next; } msg.next = null; if (false) Log.v("MessageQueue", "Returning message: " + msg); msg.markInUse(); return msg; } } .................... // Process the quit message now that all pending messages have been handled. if (mQuitting) { dispose(); return null; } .................. nextPollTimeoutMillis = 0; } }
根据上面第一部分native层源码中JNI函数的定义,能够看到调用关系链以下:
java层nativePollOnce->nativeMessageQueue.pollOnce->Looper.pollOnce
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) { int result = 0; for (;;) { ........... if (result != 0) { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - returning result %d", this, result); #endif if (outFd != NULL) *outFd = 0; if (outEvents != NULL) *outEvents = 0; if (outData != NULL) *outData = NULL; return result; } result = pollInner(timeoutMillis); } }
而Looper.pollOnce最终调用了Looper.pollInner
int Looper::pollInner(int timeoutMillis) { ............... // Poll. int result = ALOOPER_POLL_WAKE; struct epoll_event eventItems[EPOLL_MAX_EVENTS]; //调用epoll_wait系统调用监听fd上事件,或者直到超时返回 int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); ............. // Check for poll error. if (eventCount < 0) { if (errno == EINTR) { goto Done; } ALOGW("Poll failed with an unexpected error, errno=%d", errno); result = ALOOPER_POLL_ERROR; goto Done; } // Check for poll timeout. if (eventCount == 0) { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - timeout", this); #endif result = ALOOPER_POLL_TIMEOUT; goto Done; } // Handle all events. #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount); #endif for (int i = 0; i < eventCount; i++) { int fd = eventItems[i].data.fd; uint32_t epollEvents = eventItems[i].events; ////判断是不是主动唤醒, if (fd == mWakeReadPipeFd) { if (epollEvents & EPOLLIN) { awoken(); } else { ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents); } } ............. } Done: ; ................... return result; }
这里又出现了mWakeReadPipeFd,让咱们经过Looper构造函数看看mWakeReadPipeFd是个什么鬼
Looper::Looper(bool allowNonCallbacks) : mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) { int wakeFds[2]; //建立管道 int result = pipe(wakeFds); LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe. errno=%d", errno); mWakeReadPipeFd = wakeFds[0]; mWakeWritePipeFd = wakeFds[1]; result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK); LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake read pipe non-blocking. errno=%d", errno); result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK); LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking. errno=%d", errno); // Allocate the epoll instance and register the wake pipe. mEpollFd = epoll_create(EPOLL_SIZE_HINT); LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno); //把读端管道添加到epoll监控列表并监听读端事件 struct epoll_event eventItem; memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union eventItem.events = EPOLLIN; eventItem.data.fd = mWakeReadPipeFd; result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem); LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance. errno=%d", errno); }
原来mWakeReadPipeFd只是管道的读端fd,可能童鞋们这时又有疑问
一、为何要建立一个管道并监听读端事件?
二、为何消息入队唤醒消息线程时,仅仅是向读端写一个字符?
经过上面的源码,咱们知道当消息线程没有消息,则会一直阻塞到超时结束;可是若阻塞过程当中,子线程发送一条消息,而这时消息线程还在阻塞中呢,那只能等消息线程阻塞结束才能处理消息,这样会形成消息处理延迟
可能聪明的童鞋会说,那我超时时间设置短一点行不行,这样看起来没问题,可是太短的超时时间基本上等于轮询,效率低不说还浪费CPU浪费电
因此经常使用的作法是:
一、建立一个pipe
二、pipe的读取端放入epoll监听队列
三、当须要当即唤醒消息线程时,子线程仅仅往读取端管道写一个字符就行
经过上面的分析,如今让咱们总结一下
消息线程消费消息:
一、消息线程建立MessageQueue时,会在native层建立一个NativeMessageQueue
二、消息线程经过native调用进入阻塞状态,直到当有新消息到达被主动唤醒或者阻塞超时
三、消息线程被唤醒后,会判断是否有消息须要处理,若是有则返回消息处理,不然会继续步骤2直到退出
子线程传递消息流程:
一、MessageQueue中,消息按照执行时间从早到晚排列,当子线程发送消息后,根据消息执行时间判断是否须要当即唤醒消息线程
二、若是须要当即唤醒消息线程,则经过native端唤醒消息线程
三、消息线程被唤醒后,判断是否有消息须要处理
四、若是有新消息须要处理则返回,不然继续进入阻塞状态
一点思考:
Java层消息传递与消费为何不用wait和Notify来实现,Native层仅仅是阻塞消息线程或者唤醒消息线程(Native层消息队列要强大得多,能够监听FD)有种杀鸡焉用牛刀的感受