参考资料;html
https://zh.wikipedia.org/wiki/%E7%AE%A1%E9%81%93_(Unix)java
https://blog.csdn.net/qq_33951180/article/details/68959819linux
https://blog.csdn.net/cywosp/article/details/27316803android
很久没有写文章了,最近想要学习的东西不少,opengl es也很久没更新了,主要是事情太多了,恰好在公司研究了一下Native层的Handler源码实现,这里记录一下学习的内容。c++
因为Native的Handler设计到c++以及对饮Linux系统接口的调用,文章讲述的内容有以下三个方面:segmentfault
首先弄懂对应的API可以帮助咱们更好的去理解对应的Handler源码。数组
tips缓存
进程间的通讯方式:bash
- 管道(pipe)和命名管道(FIFO)
- 信号(signal)
- 消息队列
- 共享内存
- 信号量
- 套接字(socket)
想要了解多点的能够查看个人这篇文章异步
管道是Linux中进行进程通讯或者线程通讯的一种手段之一,管道分为匿名管道(pipe)以及命名管道(named pipe),管道是内核维护的一个缓存, 它提供两个 fd, 从一个fd写入数据, 从另外一个fd读出数据. 因此它是半双工的。
关于为何是半双工而不是双工的请看这篇文章:
这里因为Android的Native源码中运用的是匿名管道,只针对匿名管道进行说明,关于命名管道(我也不太了解)有兴趣的请自行查阅资料。
匿名管道经过调用pipe(int[2])函数来进行获取两个描述符,分别表明着管道读端以及管道的写端,方式以下:
int fds[2]; int result=pipe(fds); if(result>=0){ ...作本身的事情 }
以以上例子为例,即fds[0]为管道的读端,fds[1]为管道的写端。管道的两端是两个普通的,匿名的文件描述符,这就让其余进程没法链接该管道,因此称之为匿名管道。对于进程而言,经过管道通讯须要在进程A关闭读/写端,在进程B关闭写/读端,数据流向为单向。对于线程而言,不须要关闭管道任何端,子线程是和建立它的进程共享fd的,任何一方关闭管道的读或写都会影响到另外一方。
使用匿名管道须要注意以下几个点:
首先试下线程间经过匿名管道进行数据交换的过程:
void* run(void* fd){ std::cout<<"run start"<<std::endl; char str[] = "hello everyone!"; write( *(int*)fd, str,strlen(str) ); } int main (void) { int fd[2]; if(pipe(fd)){ throw out_of_range("error"); } pthread_t tid=0; pthread_create(&tid,NULL,run,&fd[1]); pthread_join(tid, NULL); char readbuf[1024]; sleep(3); // read buf from child thread read( fd[0], readbuf, sizeof(readbuf) );//阻塞操做 printf("%s\n",readbuf); return (EXIT_SUCCESS); } //执行命令g++ main.cpp -o test -lpthread // ./test //输出结果 run start //等待三秒后 hello everyone!
经过匿名管道,咱们在子线程中调用write(...)函数将数据写入,在主线程中调用read(...)函数获取对应的数据,从而实现了对应的子线程到主线程的数据的单向流通的操做,那若是要子线程读取主线程经过匿名管道写入的数据,改下实现便可:
printMsg (char ch) { std::cout << ch << std::endl; } void* run(void* fd){ std::cout<<"run start"<<std::endl; char readbuf[1024]; read(*(int*)fd, readbuf, sizeof(readbuf) ); printf("%s\n",readbuf); } int main (void) { int fd[2]; if(pipe(fd)){ throw out_of_range("error"); } pthread_t slef=pthread_self(); std::cout<<"pthread_id="<<slef<<std::endl; pthread_t tid=0; pthread_create(&tid,NULL,run,&fd[0]); // read buf from child thread char str[] = "hello everyone!"; write(fd[1], str,strlen(str) ); sleep(3); return (EXIT_SUCCESS); } //输出结果与上面的相同
接下来看下进程间经过匿名管道进行数据交流的过程,主要运行fork()函数进行子进程的初始化过程,首先测试从子进程写数据,父进程读数据的状况:
int main (void) { int fd[2]; int pid=0; char str[]="hello everyone"; char readBuffer[1024]; if(pipe(fd)>=0){ if((pid=fork())<0){ printf("%s","fork error"); }else if(pid==0){ //子进程 printf("%s\n","子进程建立成功"); //关闭子进程的读端 close(fd[0]); //写数据 write(fd[1],str,strlen(str)); printf("%s\n","子进程写入数据完毕"); }else{ //父进程,即当前进程 printf("%s\n","父进程开始做业"); //关闭父进程写端 close(fd[1]); sleep(3); read(fd[0],readBuffer,sizeof(readBuffer)); printf("父进程读到数据=%s\n",readBuffer); } } return (EXIT_SUCCESS); } //运行结果 父进程开始做业 子进程建立成功 子进程写入数据完毕 父进程读到数据=hello everyone
测试从父进程写数据,子进程读数据的状况:
int main (void) { int fd[2]; int pid=0; char str[]="hello everyone"; char readBuffer[1024]; if(pipe(fd)>=0){ if((pid=fork())<0){ printf("%s","fork error"); }else if(pid==0){ printf("%s\n","子进程开始做业"); //关闭子进程写端 close(fd[1]); sleep(3); read(fd[0],readBuffer,sizeof(readBuffer)); //子进程,即当前进程 printf("子进程读到数据=%s\n",readBuffer); }else{ //父进程 printf("%s\n","父进程建立成功"); //关闭父进程的读端 close(fd[0]); //写数据 write(fd[1],str,strlen(str)); printf("%s\n","父进程写入数据完毕"); } } return (EXIT_SUCCESS); } //输出结果 父进程建立成功 父进程写入数据完毕 子进程开始做业 子进程读到数据=hello everyone
epoll是Linux对于select以及poll的加强版,在Linux的2.6内核提出。对于epoll能够直接在bash中用man进行文档查看,或者查阅官网对应的内容。
对于epoll而言,网上有不少文章讲了其实现的功能以及对应与select以及poll的比较,这里对于我认为比较好的文章进行总结以及梳理,资料大多来自于网上。
附:学习来源
https://zh.wikipedia.org/wiki/Epoll
http://blog.51cto.com/yaocoder/888374
https://www.zhihu.com/question/28594409
对于select,poll以及epoll的而言,三个都是IO多路复用的机制,能够监视多个描述符的读/写等事件,一旦某个描述符就绪(通常是读或者写事件发生了),就可以将发生的事件通知给关心的应用程序去处理该事件。但select,poll,epoll本质上都是同步I/O,由于他们都须要在读写事件就绪后本身负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需本身负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。
关于IO复用机制的说明,能够看下知乎的讲解做为最直观的理解思路,slect,poll以及epoll的优缺点整理以下:
select优缺点以下:
缺点:
每次调用select,都须要把fd集合从用户态拷贝到内核态,这个开销在fd不少时会很大;
同时每次调用select都须要在内核遍历传递进来的全部fd,这个开销在fd不少时也很大;
select支持的文件描述符数量过小了,默认是1024。
优势:
select的可移植性更好,在某些Unix系统上不支持poll()。
select对于超时值提供了更好的精度:微秒,而poll是毫秒。
poll优缺点以下:
缺点:
大量的fd的数组被总体复制于用户态和内核地址空间之间,而无论这样的复制是否是有意义;
与select同样,poll返回后,须要轮询pollfd来获取就绪的描述符。
优势:
poll() 不要求开发者计算最大文件描述符加一的大小。
poll() 在应付大数目的文件描述符的时候速度更快,相比于select。
它没有最大链接数的限制,缘由是它是基于链表来存储的。
epoll的优势就是改进了前面所说缺点:
支持一个进程打开大数目的socket描述符:相比select,epoll则没有对FD的限制,它所支持的FD上限是最大能够打开文件的数目,这个数字通常远大于2048,举个例子,在1GB内存的机器上大约是10万左右,具体数目能够cat /proc/sys/fs/file-max察看,通常来讲这个数目和系统内存关系很大。
IO效率不随FD数目增长而线性降低:epoll不存在这个问题,它只会对"活跃"的socket进行操做--- 这是由于在内核实现中epoll是根据每一个fd上面的callback函数实现的。那么,只有"活跃"的socket才会主动的去调用 callback函数,其余idle状态socket则不会,在这点上,epoll实现了一个"伪"AIO,由于这时候推进力在os内核。在一些 benchmark中,若是全部的socket基本上都是活跃的---好比一个高速LAN环境,epoll并不比select/poll有什么效率,相 反,若是过多使用epoll_ctl,效率相比还有稍微的降低。可是一旦使用idle connections模拟WAN环境,epoll的效率就远在select/poll之上了。
使用mmap加速内核与用户空间的消息传递:这点实际上涉及到epoll的具体实现了。不管是select,poll仍是epoll都须要内核把FD消息通知给用户空间,如何避免没必要要的内存拷贝就 很重要,在这点上,epoll是经过内核于用户空间mmap同一块内存实现的。
epoll主要提供三个API给开发者进行调用实现自主功能:
边缘触发(edge-triggered 简称ET)和水平触发(level-triggered 简称LT):
epoll的事件派发接口能够运行在两种模式下:边缘触发(edge-triggered)和水平触发(level-triggered),两种模式的区别请看下面,咱们先假设下面的状况:
若是rfd被设置了ET,在调用完第五步的epool_wait 后会被挂起,尽管在缓冲区还有能够读取的数据,同时另一段的管道还在等待发送完毕的反馈。这是由于ET模式下只有文件描述符发生改变的时候,才会派发事件。因此第五步操做,可能会去等待已经存在缓冲区的数据。在上面的例子中,一个事件在第二步被建立,再第三步中被消耗,因为第四步中没有读取完缓冲区,第五步中的epoll_wait可能会一直被阻塞下去。
下面状况下推荐使用ET模式:
相比之下,当咱们使用LT的时候(默认),epoll会比poll更简单更快速,并且咱们可使用在任何一个地方。
上述讲述水平触发和边缘触发翻译来自epoll的doc中,想要彻底理解能够查看这篇文章,讲的十分清楚。
int epoll_create(int size);
epoll_create() 能够建立一个epoll实例。在linux 内核版本大于2.6.8 后,这个size 参数就被弃用了,可是传入的值必须大于0。若是执行成功,返回一个非负数(实际为文件描述符), 若是执行失败,会返回-1。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
这个系统调用可以控制给定的文件描述符epfd指向的epoll实例,op是添加事件的类型,fd是目标文件描述符。
有效的op值有如下几种:
第三个参数是须要监听的fd。第四个参数是告诉内核须要监听什么事,代码结构以下:
typedef union epoll_data { void *ptr; int fd; __uint32_t u32; __uint64_t u64; } epoll_data_t; //感兴趣的事件和被触发的事件 struct epoll_event { __uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */ };
events这个参数是一个字节的掩码构成的。下面是能够用的事件:
返回值:若是成功,返回0。若是失败,会返回-1, errno将会被设置。有如下几种错误:
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
epoll_wait 这个系统调用是用来等待epfd中的事件。events指向调用者可使用的事件的内存区域。maxevents告知内核有多少个events,必需要大于0.
timeout这个参数是用来制定epoll_wait 会阻塞多少毫秒,会一直阻塞到下面几种状况:
当timeout等于-1的时候这个函数会无限期的阻塞下去,当timeout等于0的时候,就算没有任何事件,也会马上返回。
下面写个例子演示一下epoll和pipe一块儿使用的过程:
static int MAX=256; struct Data{ int* fd; int epfd; struct epoll_event events[]; }; void *runEp(void* data){ printf("线程运行开始\n"); Data r_data=*(Data*)data; struct epoll_event allEvs[MAX]; int pipeFd=*(r_data.fd); //struct epoll_event events[MAX]=r_data.events; int count=epoll_wait(r_data.epfd,allEvs,MAX,5000); for(int i=0;i<count;i++){ if(allEvs[i].data.fd==pipeFd&&(allEvs[i].events&EPOLLIN)){ printf("接收到管道能够进行读的信号,开始读取\n"); char buffer[MAX]; read(pipeFd,buffer,100); printf("读取的内容是:%s\n",buffer); } } } void testEpoll(){ int epollId=epoll_create(MAX); if(epollId<=0){ throw out_of_range("epoll error"); } int pipFd[2]; int pirRes; if((pirRes=pipe(pipFd))<0){ throw out_of_range("pipe error"); } struct epoll_event event; event.data.fd=pipFd[0];//监听管道读端 event.events=EPOLLIN|EPOLLET;//设置参数,接收能够read()的通知,设置边缘触发模式 int epfd=epoll_create(MAX); struct Data data; data.epfd=epfd; data.fd=&pipFd[0]; int res=epoll_ctl(epfd,EPOLL_CTL_ADD,pipFd[0],&event); if(res!=0){ throw out_of_range("pipe error"); } pthread_t tid=12; pthread_create(&tid,NULL,runEp,&data); sleep(2); char str[] = "hello everyone!"; write(pipFd[1], str,strlen(str) ); printf("写入管道数据完毕\n"); sleep(3); } //运行testEpoll()输出结果: 线程运行开始 写入管道数据完毕 接收到管道能够进行读的信号,开始读取 读取的内容是:hello everyone!
上面了解了一下关于管道以及epoll,接下来跟踪一下Handler的具体源码来理一下逻辑。首先Looper在初始化的时候会同时初始化一个MessageQueue,在MessageQueue的构造函数以下:
MessageQueue(boolean quitAllowed) { mQuitAllowed = quitAllowed; mPtr = nativeInit(); }
对应的native层实如今android_os_MessageQueue.cpp
文件中:
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) { //初始化一个本地的MessageQueue NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue(); if (!nativeMessageQueue) { jniThrowRuntimeException(env, "Unable to allocate native queue"); return 0; } nativeMessageQueue->incStrong(env);//增长引用 return reinterpret_cast<jlong>(nativeMessageQueue);//返回指针地址 }
上述代码主要相关的为两件事情:
在NativeMessageQueue初始化过程以下:
NativeMessageQueue::NativeMessageQueue() : mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) { mLooper = Looper::getForThread(); if (mLooper == NULL) { mLooper = new Looper(false); Looper::setForThread(mLooper); } }
这里在Native层也创建了一个Looper,实际上能够理解为Looper.java在Native层的映射,看下构造函数:
Looper::Looper(bool allowNonCallbacks) : mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) { int wakeFds[2]; int result = pipe(wakeFds); LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe. errno=%d", errno); mWakeReadPipeFd = wakeFds[0]; mWakeWritePipeFd = wakeFds[1]; ... mIdling = false; // Allocate the epoll instance and register the wake pipe. mEpollFd = epoll_create(EPOLL_SIZE_HINT); LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno); struct epoll_event eventItem; memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union eventItem.events = EPOLLIN;//监听管道的read()操做 eventItem.data.fd = mWakeReadPipeFd;//记录管道读端的fd result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem); ... }
这里的一套在学习epoll的时候已经见识过了,在native层的Looper的构造函数中会去监听管道读端的read()操做。
总结一下messagequeue.nativeInit()作的事情:
调用Natvie层代码在Native初始化一个NativeMessageQueue和Looper,在Looper中会开启一个匿名管道,由epoll来监听I/O事件的变化,当管道中有数据的时候,经过epoll通知系统读取数据。最后返回一个NativeMessageQueue的指针交由Java层的MessageQueue方便下次寻址访问。
ok,这里初始化完Java层的Looper,以后会调用Looper.loop()方法,在该方法中会一直取MessageQueue里面的数据:
public static void loop() { ... for (;;) { Message msg = queue.next(); // might block if (msg == null) { // No message indicates that the message queue is quitting. return; } ... }
MessageQueue.next()方法以下:
Message next() { //获取指针地址 final long ptr = mPtr; if (ptr == 0) { return null; } int pendingIdleHandlerCount = -1; // -1 only during first iteration int nextPollTimeoutMillis = 0; for (;;) { if (nextPollTimeoutMillis != 0) { Binder.flushPendingCommands(); } nativePollOnce(ptr, nextPollTimeoutMillis); synchronized (this) { // Try to retrieve the next message. Return if found. final long now = SystemClock.uptimeMillis(); Message prevMsg = null; Message msg = mMessages; if (msg != null && msg.target == null) { // Stalled by a barrier. Find the next asynchronous message in the queue. do { prevMsg = msg; msg = msg.next; } while (msg != null && !msg.isAsynchronous()); } if (msg != null) { if (now < msg.when) { // Next message is not ready. Set a timeout to wake up when it is ready. nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE); } else { // Got a message. mBlocked = false; if (prevMsg != null) { prevMsg.next = msg.next; } else { mMessages = msg.next; } msg.next = null; if (DEBUG) Log.v(TAG, "Returning message: " + msg); msg.markInUse(); return msg; } } else { // No more messages. nextPollTimeoutMillis = -1; } // Process the quit message now that all pending messages have been handled. if (mQuitting) { dispose(); return null; } // If first time idle, then get the number of idlers to run. // Idle handles only run if the queue is empty or if the first message // in the queue (possibly a barrier) is due to be handled in the future. if (pendingIdleHandlerCount < 0 && (mMessages == null || now < mMessages.when)) { pendingIdleHandlerCount = mIdleHandlers.size(); } if (pendingIdleHandlerCount <= 0) { // No idle handlers to run. Loop and wait some more. mBlocked = true; continue; } if (mPendingIdleHandlers == null) { mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)]; } mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers); } // Run the idle handlers. // We only ever reach this code block during the first iteration. for (int i = 0; i < pendingIdleHandlerCount; i++) { final IdleHandler idler = mPendingIdleHandlers[i]; mPendingIdleHandlers[i] = null; // release the reference to the handler boolean keep = false; try { keep = idler.queueIdle(); } catch (Throwable t) { Log.wtf(TAG, "IdleHandler threw exception", t); } if (!keep) { synchronized (this) { mIdleHandlers.remove(idler); } } } // Reset the idle handler count to 0 so we do not run them again. pendingIdleHandlerCount = 0; // While calling an idle handler, a new message could have been delivered // so go back and look again for a pending message without waiting. nextPollTimeoutMillis = 0; } }
这里能够看到调用了nativePollOnce(...)
方法进入了native层,对应实现为:
//`android_os_MessageQueue.cpp` static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, jlong ptr, jint timeoutMillis) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr); nativeMessageQueue->pollOnce(env, obj, timeoutMillis); }
该方法最终调用native层的Looper.pollOnce(...)
:
//Looper.cpp int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) { int result = 0; for (;;) { ... result = pollInner(timeoutMillis); } } int Looper::pollInner(int timeoutMillis) { ... // Poll. int result = POLL_WAKE; mResponses.clear(); mResponseIndex = 0; // We are about to idle. mIdling = true; struct epoll_event eventItems[EPOLL_MAX_EVENTS]; //阻塞等待能够读取管道的通知 int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); // No longer idling. mIdling = false; // Acquire lock. mLock.lock(); ... for (int i = 0; i < eventCount; i++) { int fd = eventItems[i].data.fd; uint32_t epollEvents = eventItems[i].events; if (fd == mWakeReadPipeFd) { if (epollEvents & EPOLLIN) { awoken();// } else { ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents); } } else { ... } } Done: ; ... return result; }
关键代码在于awaken()
方法:
void Looper::awoken() { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ awoken", this); #endif char buffer[16]; ssize_t nRead; do { nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));//能够看到读取了管道中的内容 } while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer)); }
那么read(..)方法执行了,哪里进行write(..)方法的操做呢?答案在于咱们将消息push到MessageQueue中时候,即MessageQueue.enqueueMessages(...)方法中,里面会执行:
nativeWake(mPtr);
这个最终会调用到native层的Looper中的wake()方法:
void Looper::wake() { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ wake", this); #endif ssize_t nWrite; do { nWrite = write(mWakeWritePipeFd, "W", 1);//进行了写操做 } while (nWrite == -1 && errno == EINTR); if (nWrite != 1) { if (errno != EAGAIN) { ALOGW("Could not write wake signal, errno=%d", errno); } } }
Handler在native层主要的逻辑代码已经了解了,那么总结一下:
引用Gityuan大神的解释:
在主线程的MessageQueue没有消息时,便阻塞在loop的queue.next()中的nativePollOnce()方法里,详情见Android消息机制1-Handler(Java层),此时主线程会释放CPU资源进入休眠状态,直到下个消息到达或者有事务发生,经过往pipe管道写端写入数据来唤醒主线程工做。这里采用的epoll机制,是一种IO多路复用机制,能够同时监控多个描述符,当某个描述符就绪(读或写就绪),则马上通知相应程序进行读或写操做,本质同步I/O,即读写是阻塞的。 因此说,主线程大多数时候都是处于休眠状态,并不会消耗大量CPU资源。