ActivityThread 是 android 应用程序的“主线程”,它是由 ActivityManagerService 调用 Process 进程类交由 Zygote 孵化出一个新进程的时候作为 entryPoint 参数指定的。java
/* framework/base/services/core/com/android/server/am/ActivityManagerService.java */
private final boolean startProcessLocked(ProcessRecord app, String hostingType,
String hostingNameStr, boolean disableHiddenApiChecks, String abiOverride) {
...
final String entryPoint = "android.app.ActivityThread";
return startProcessLocked(hostingType, hostingNameStr, entryPoint, app, uid, gids,
runtimeFlags, mountExternal, seInfo, requiredAbi, instructionSet, invokeWith,
startTime);
}
// 普通 app 进程的 startProcessLocked 会执行到下面的函数上去
private ProcessStartResult startProcess(String hostingType, String entryPoint,
ProcessRecord app, int uid, int[] gids, int runtimeFlags, int mountExternal,
String seInfo, String requiredAbi, String instructionSet, String invokeWith,
long startTime) {
...
// Process 进程类会间接调用 Zygote 孵化出一个新的进程
startResult = Process.start(entryPoint,
app.processName, uid, uid, gids, runtimeFlags, mountExternal,
app.info.targetSdkVersion, seInfo, requiredAbi, instructionSet,
app.info.dataDir, invokeWith,
new String[] {PROC_START_SEQ_IDENT + app.startSeq});
eturn startResult;
}
复制代码
目标进程孵化成功后,会触发 ActivityThread 的 main() 方法的执行,首先她会进行一些环境配置的准备,接着会调用 prepareMainLooper() 初始化 Looper 对象,并将这个 Looper 对象与当前线程一一绑定;在建立 Looper 对象的时候,也伴随着 MessageQueue 对象的建立,Looper 与 MessageQueue 也是一一对应的。linux
/* frameworks/base/core/java/android/app/ActivityThread.java*/
static volatile Handler sMainThreadHandler; // set once in main()
final H mH = new H();
public static void main(String[] args) {
...
Looper.prepareMainLooper();
ActivityThread thread = new ActivityThread();
thread.attach(false, startSeq);
if (sMainThreadHandler == null) {
sMainThreadHandler = thread.getHandler();
}
Looper.loop();
throw new RuntimeException("Main thread loop unexpectedly exited");
}
final Handler getHandler() {
return mH;
}
// H 用于分发或处理 android 封装后的 events 事件
class H extends Handler {
...
}
复制代码
sThreadLocal 是一个静态类型变量,意味着一旦 import 了 Looper 后,sThreadLocal 就已经存在并构建完毕。ThreadLocal 是一个本地线程副本变量工具类,这个类能使线程中的某个值与保存值的对象关联起来。ThreadLocal 提供了 get 与 set 等访问接口或方法,这些方法为每一个使用该变量的线程都存有一份独立的副本,所以 get 老是返回由当前执行线程在调用 set 时设置的值。android
Looper 正式利用 ThreadLocal 的这种特性来将 thread、Looper、MessageQueue 一一关联起来的。segmentfault
/* frameworks/base/core/java/android/os/Looper.java*/
@UnsupportedAppUsage
static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();
public static void prepareMainLooper() {
prepare(false);
synchronized (Looper.class) {
if (sMainLooper != null) {
throw new IllegalStateException("The main Looper has already been prepared.");
}
sMainLooper = myLooper();
}
}
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}
private Looper(boolean quitAllowed) {
mQueue = new MessageQueue(quitAllowed);
mThread = Thread.currentThread();
}
复制代码
这里咱们能够看出数组
MessageQueue 看名字就能猜到,是用来保存 Message 的队列。准确来讲,Runnable (做为 message 的 callback 成员) 和 Message 均可以被压入 MessageQueue 中,造成一个集合。MessageQueue 的构造函数中会调用 nativeInit() 经过 jni 调用进入到 C++ framework 层bash
/*frameworks/base/core/java/android/os/MessageQueue.java */
private final boolean mQuitAllowed;
private long mPtr; // 能够理解为对 Native 层 MessageQueue 的引用
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit();
}
private native static long nativeInit();
复制代码
nativeInit 在 native 层一样建立了 MessageQueue 、Looper 对象,这与 java 层很是类似。由此咱们能够猜测,在 native 层也应该实现了一套消息传递机制,在 java 层,MessageQueue 中的一条条 message 最终会交由 handler 去处理,在 native 层是否也是如此?并发
/* frameworks/base/core/jni/android_os_MessageQueue.cpp */
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate native queue");
return 0;
}
nativeMessageQueue->incStrong(env); // 对 env 的强弱引用执行加1
return reinterpret_cast<jlong>(nativeMessageQueue); // 将 nativeMessageQueue 转换成 long 类型来表示
}
class NativeMessageQueue : public MessageQueue, public LooperCallback {
...
NativeMessageQueue::NativeMessageQueue() :
mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
mLooper = Looper::getForThread();
if (mLooper == NULL) {
mLooper = new Looper(false); // 这里的 Looper 是 native 的Looper
Looper::setForThread(mLooper);
}
....
}
}
复制代码
接着回到 ActivityThread,当执行完上面的步骤后,分别在 java 层和 native 层都各自初始化了她们的 Looper 与 MessageQueue 对象,至此,prepareMainLooper() 执行完毕。接着便会 new 一个 ActivityThread 对象,随即经过 thread.attach(false, startSeq)
方法将 ActivityThread 与当前线程绑定,并执行 application、activity、services 等的启动流程。这部分流程与本篇内容关系不大,故不作展开。attach()
执行以后紧接着会经过这个 ActivityThread 对象的 getHandler()
方法获取到一个 Handler 对象,并将其标赋值为sMainThreadHandler(主线程 Handler),这个 sMainThreadHandler 仅会在 ActivityThread 初始化的时候执行一次。app
/* frameworks/base/core/java/android/app/ActivityThread.java */
public static void main(String[] args) {
...
ActivityThread thread = new ActivityThread();
thread.attach(false, startSeq);
if (sMainThreadHandler == null) {
sMainThreadHandler = thread.getHandler();
}
...
}
复制代码
至此,Handler 对象出如今了咱们视野。那么 Handler 与线程 Thread 又有什么关系呢?异步
表面上看确实没什么关联,可是因为每一个 Message 最多指定一个 Handler 来处理事件,而每一个 MessageQueue 中能够有 N 个 Message,结合前面 MessageQueue 、Looper、与 Thread 的一一对应关系,不难推导出,Thread 与 Handler 是一对多的关系。socket
sMainThreadHandler 初始化后,主线程便经过 Looper.loop()
方法开始了永不停歇的循环。若是遇到某些异常致使 loop 退出,主线程也将抛出异常,程序崩溃。
/* frameworks/base/core/java/android/os/Looper.java */
public static void loop() {
final Looper me = myLooper();
if (me == null) {
throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
}
final MessageQueue queue = me.mQueue;
for (;;) {
...
Message msg = queue.next(); // might block
if (msg == null) {
// No message indicates that the message queue is quitting.
return;
}
...
try {
msg.target.dispatchMessage(msg);
} finally {
if (traceTag != 0) {
Trace.traceEnd(traceTag);
}
}
...
msg.recycleUnchecked();
}
}
public static @Nullable Looper myLooper() {
return sThreadLocal.get();
}
复制代码
首先会经过 myLooper() 拿到保存在 ThreadLocal 中的 Looper 对象,若是该线程没有对 Looper 初始化,则会抛出异常。拿到 Looper 对象后再经过这个 Looper 就能够拿到 MessageQueue ,而后开始执行死循环,经过 queue.next()
不断从 MessageQueue 中取出 Message 来处理。
任一线程,只要初始化过 Looper ,就能够调用loop()
来处理事件,这是由于 Looper 是保存在 sThreadLocal 中的,主线程只是经过Looper.prepareMainLooper()
已经帮咱们处理了这一过程。对于子线程来讲,也能够经过Looper.prepare()
来达到这一目的,而后就能够经过Looper.loop()
开启循环,处理事件了。
既然是一个死循环在不断经过queue.next()
读取 message 来处理,为何主线程不至于卡死?或者是 ANR 呢?从源码注释来看queue.next()
是可能形成线程阻塞的。因此玄机也应该在这个方法里面。
/* frameworks/base/core/java/android/os/MessageQueue.java */
Message next() {
// 根据前面介绍,mPtr 能够理解为对 native MessageQueue 的引用
// ptr == 0 ,说明 native MessageQueue 不可用,程序处于异常状态,loop 也将结束
final long ptr = mPtr;
if (ptr == 0) {
return null;
}
//正常状况都会执行到这里来
int pendingIdleHandlerCount = -1; // -1 only during first iteration
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
// 该方法用于将当前线程中任何待处理的 Binder 命令刷新到驱动中
// 在执行可能阻塞很长时间的操做调用前调用颇有用
Binder.flushPendingCommands();
}
nativePollOnce(ptr, nextPollTimeoutMillis); // 这个方法很重要
synchronized (this) {
// Try to retrieve the next message. Return if found.
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
if (msg != null && msg.target == null) {
// 经过 barrier,找到要处理的异步消息
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
if (now < msg.when) {
/* 到这里说明 message 还未准备执行,须要等待一段时间,nextPollTimeoutMillis 将被传入 nativePollOnce 让线程休眠,待 nextPollTimeoutMillis 时间知足后再唤醒线程 */
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// 获取到 message 的处理,正常流程会执行到这个分支
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
msg.markInUse(); // 标记已处理
return msg;
}
} else {
nextPollTimeoutMillis = -1;
}
if (mQuitting) {
/* 若是 ptr!=0,调用 nativeDestroy() disposes message queue*/
dispose();
return null;
}
/* 处理注册的IdleHandler,当MessageQueue中没有Message时,
Looper会调用IdleHandler作一些工做 */
...
nextPollTimeoutMillis = 0;
}
}
复制代码
next()
方法会一直等待到下一个消息的执行时间到来而后取出并返回。等待的期间线程会休眠,不占用 cpu 资源,这部分逻辑正是在 nativePollOnce
来处理的,nativePollOnce 最终是经过 native 层的 Looper 来执行 pollOnce
方法的
/* frameworks/base/core/jni/android_os_MessageQueue.cpp */
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
class NativeMessageQueue : public MessageQueue, public LooperCallback {
...
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj;
mLooper->pollOnce(timeoutMillis);
mPollObj = NULL;
mPollEnv = NULL;
if (mExceptionObj) {
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
...
}
}
复制代码
注意,这里的 Looper.cpp
L 是大写的,源码中也有 looper.cpp
,路径 frameworks/base/native/android/looper.cpp,里面有 ALooper 对象等,其实也是对 Looper.cpp 的包装,最终实现仍是经过 system/core/libutils/Looper.cpp 来实现的
/*system/core/libutils/include/utils/Looper.h */
public:
enum {
/**
* Result from Looper_pollOnce() and Looper_pollAll():
* The poll was awoken using wake() before the timeout expired
* and no callbacks were executed and no other file descriptors were ready.
*/
POLL_WAKE = -1,
/**
* Result from Looper_pollOnce() and Looper_pollAll():
* One or more callbacks were executed.
*/
POLL_CALLBACK = -2,
/**
* Result from Looper_pollOnce() and Looper_pollAll():
* The timeout expired.
*/
POLL_TIMEOUT = -3,
/**
* Result from Looper_pollOnce() and Looper_pollAll():
* An error occurred.
*/
POLL_ERROR = -4,
};
int pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData);
inline int pollOnce(int timeoutMillis) {
return pollOnce(timeoutMillis, nullptr, nullptr, nullptr);
}
private:
struct Request {
int fd;
int ident;
int events;
int seq;
sp<LooperCallback> callback;
void* data;
void initEventItem(struct epoll_event* eventItem) const;
};
struct Response {
int events;
Request request;
};
复制代码
pollOnce() 方法中,先处理 Response 数组中不带 Callback的事件,再调用了 pollInner() 方法
/* system/core/libutils/Looper.cpp */
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
/* ident >= 0,表示没有 callback,由于 POLL_CALLBACK = -2 */
if (ident >= 0) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
if (outFd != nullptr) *outFd = fd;
if (outEvents != nullptr) *outEvents = events;
if (outData != nullptr) *outData = data;
return ident;
}
}
// result != 0 说明
if (result != 0) {
if (outFd != nullptr) *outFd = 0;
if (outEvents != nullptr) *outEvents = 0;
if (outData != nullptr) *outData = nullptr;
return result;
}
// 内部轮循
result = pollInner(timeoutMillis);
}
}
复制代码
pollInner 先调用 epoll_wait() 等待事件的发生或者超时,对于 epoll_wait 的返回值 eventCount,若是小于等于0,先将 result 标记为对应的值,直接跳转到Done。若是 epoll 重建,也会直接跳转 Done。进入Done标记位的代码段,会先调用 Native 的 Handler 来处理该 Message,而后再处理 Response 数组,若是是 CALLBACK 回调,就交由具体请求方去处理(不带 Callback 的事件 pollOnce() 已经处理了 )。
/* system/core/libutils/Looper.cpp */
static const int EPOLL_MAX_EVENTS = 16;
int Looper::pollInner(int timeoutMillis) {
...
// Poll.
int result = POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
//epoll_event 最大为16个
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
// epoll_wait 是 linux 中 epoll 相关的系统调用
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// Acquire lock.
mLock.lock();
// epoll重建,直接跳转Done
if (mEpollRebuildRequired) {
mEpollRebuildRequired = false;
rebuildEpollLocked(); // epoll重建方法,内部会使用epoll_create1系统调用
goto Done;
}
// epoll事件个数小于0,发生错误,直接跳转Done
if (eventCount < 0) {
if (errno == EINTR) {
goto Done;
}
result = POLL_ERROR; // result=-4
goto Done;
}
// epoll事件个数等于0,发生超时,直接跳转Done
if (eventCount == 0) {
result = POLL_TIMEOUT; // result=-3
goto Done;
}
// 遍历处理事件
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeEventFd.get()) {
if (epollEvents & EPOLLIN) {
awoken(); //已经唤醒了,则读取并清空管道数据
}
} else {
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
//将request封装成response对象,push到mResponses队列
pushResponse(events, mRequests.valueAt(requestIndex));
}
}
}
Done: ;
mNextMessageUptime = LLONG_MAX;
// 处理native的message
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
{
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();
handler->handleMessage(message); //经过handler处理消息
}
mLock.lock();
mSendingMessage = false;
result = POLL_CALLBACK;
} else {
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}
// Release lock.
mLock.unlock();
// 理带有Callback()方法的Response事件
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
if (response.request.ident == POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
//处理请求的回调方法
int callbackResult = response.request.callback->handleEvent(fd, events, data);
if (callbackResult == 0) {
removeFd(fd, response.request.seq);
}
// 清除callback引用
response.request.callback.clear();
result = POLL_CALLBACK;
}
}
return result;
}
复制代码
pollOnce返回值 | 说明 |
---|---|
POLL_WAKE | 表示由wake()触发,即pipe写端的write事件触发 |
POLL_CALLBACK | 表示某个被监听fd被触发 |
POLL_TIMEOUT | 表示等待超时 |
POLL_ERROR | 表示等待期间发生错误 |
epoll 是 Linux 内核中的一套高效 IO多路复用模型。Linux 经过 socket 睡眠队列来管理全部等待 socket 的某个事件的 process,同时经过 wakeup 机制来异步唤醒整个睡眠队列上等待事件的 process,通知 process 相关事件发生。
I/O 多路复用(又被称为“事件驱动”),首先要理解的是,操做系统为你提供了一个功能,当你的某个 socket 可读或者可写的时候,它能够给你一个通知。这样当配合非阻塞的 socket 使用时,只有当系统通知我哪一个描述符可读了,我才去执行 read 操做,能够保证每次 read 都能读到有效数据而不作纯返回 -1 和 EAGAIN 的无用功。写操做相似。操做系统的这个功能经过 select/poll/epoll/kqueue 之类的系统调用函数来使用,这些函数均可以同时监视多个描述符的读写就绪情况,这样,多个描述符的 I/O 操做都能在一个线程内并发交替地顺序完成,这就叫 I/O 多路复用,这里的“复用”指的是复用同一个线程。
Epoll事件 | 含义 |
---|---|
EPOLLIN | 关联的文件描述符能够读(包括对端SOCKET正常关闭) |
EPOLLOUT | 关联的文件描述符能够写 |
EPOLLPRI | 关联的文件描述符有紧急的数据可读(这里应该表示有带外数据到来) |
EPOLLERR | 关联的文件描述符发生错误 |
EPOLLHUP | 流套接字对等关闭链接,或半关闭写。(当使用边缘触发监视时,此标记对于编写简单代码检测对等端是否关闭特别有用。2.6.17引入) |
EPOLLET | 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来讲的 |
EPOLLONESHOT | 只监听一次事件,当监听完此次事件以后,若是还须要继续监听这个socket的话,须要再次把这个socket加入到EPOLL队列里 |
系统调用 | 函数 | 功能 |
---|---|---|
epoll_create | int epoll_create(int size) | 用于建立一个epoll的句柄,size是指监听的描述符个数, 如今内核支持动态扩展,该值的意义仅仅是初次分配的fd个数,后面空间不够时会动态扩容。 当建立完epoll句柄后,占用一个fd值 |
epoll_ctl | int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) | 用于对须要监听的文件描述符(fd)执行op操做。op操做包括EPOLL_CTL_ADD、EPOLL_CTL_DEL、EPOLL_CTL_MOD epoll定义了一个结构体 struct epoll_event 来表达监听句柄的诉求。epoll_event 支持的事件对应上表 |
epoll_wait | int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout) | 等待事件的上报,该函数返回须要处理的事件数目,如返回0表示已超时 |
Epoll 的工做流程和优点,感兴趣的同窗能够参考下面的文章
Java 层与 native 层各自实现了一套 handler 机制,都有各自的 Handler、Message、MessageQueue和 Looper 对象,二者的模型与工做原理类似。java 层的Looper 经过 MessageQueue 不断调用 next() 获取 Message 交由 Handler 去处理。若是 MessageQueue 中不存在新的事件,next() 将经过 Linux Epoll 将同时让 java 层和 native 层的线程进入休眠态,线程被挂起,不在消耗 CPU 资源,直到新的事件触发,fd 发生变更,Epoll 再次唤醒休眠的目标进程,继续执行消息循环。
若是用一句简单的话语归纳 Handler 的运行机制,那就是:
Looper 不断获取 MessageQueue 中的一个 Message ,而后交由 Handler 去处理。