目录ios
上篇文章为EventLoop添加了一个定时器Fd,为EventLoop增长了3个接口:runAfter()、runAt()、runEvery()、这三个接口用于处理定时任务和周期任务. 底层经过封装TimerFd实现。网络
TimerId runAt(const TimeStamp& time, const NetCallBacks::TimerCallBack& cb); TimerId runAfter(double delay, const NetCallBacks::TimerCallBack& cb); TimerId runEvery(double interval, const NetCallBacks::TimerCallBack& cb);
今天为EventLoop添加另外一个Fd:EventFd, 用于实现线程间的事件通知机制.本文会先介绍eventfd的使用,而后给出muduo中EventLoop对eventfd的封装.socket
eventfd - 事件通知文件描述符
#include <sys/eventfd.h>
int eventfd(unsigned int initval ,int flags );ide
建立一个能被用户应用程序用于时间等待唤醒机制的eventfd对象.
initval
:
eventfd()建立一个可用做事件的“eventfd对象”用户空间应用程序和内核等待/通知机制通知用户空间应用程序的事件。该对象包含一个由内核维护的无符号64位整型(uint64_t)计数器。此计数器的初始值经过initval指定。通常设0.函数
flags
:
如下标志中按位OR运算以更改eventfd()的行为,(文件中经常使用的这两个flags确定都懂意思吧,就不翻译了,第三个信号量的无论它.):oop
EFD_CLOEXEC (since Linux 2.6.27) Set the close-on-exec (FD_CLOEXEC) flag on the new file descriptor. See the description of the O_CLOEXEC flag in open(2) for reasons why this may be useful. EFD_NONBLOCK (since Linux 2.6.27) Set the O_NONBLOCK file status flag on the new open file description. Using this flag saves extra calls to fcntl(2) to achieve the same result. EFD_SEMAPHORE (since Linux 2.6.30) Provide semaphore-like semantics for reads from the new file descriptor. See below.
read(2)学习
成功读取返回一个8byte的整数。read(2)若是提供的缓冲区的大小小于8个字节返回错误EINVALui
write (2)this
将缓冲区写入的8字节整形值加到内核计数器上。能够写入的最大值
是计数器中是最大的无符号64位值减1(即0xfffffffffffffffe)。atom
返回值:
On success, eventfd() returns a new eventfd file descriptor. On error, -1 is returned and errno is set to indicate the error.
#include <iostream> #include <assert.h> #include <poll.h> #include <signal.h> #include <sys/eventfd.h> #include <unistd.h> #include <string.h> #include <thread> static int s_efd = 0; int createEventfd() { int evtfd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); std::cout << "createEventfd() fd : " << evtfd << std::endl; if (evtfd < 0) { std::cout << "Failed in eventfd\n"; abort(); } return evtfd; } void testThread() { int timeout = 0; while(timeout < 3) { sleep(1); timeout++; } uint64_t one = 1; ssize_t n = write(s_efd, &one, sizeof one); if(n != sizeof one) { std::cout << " writes " << n << " bytes instead of 8\n"; } } int main() { s_efd = createEventfd(); fd_set rdset; FD_ZERO(&rdset); FD_SET(s_efd, &rdset); struct timeval timeout; timeout.tv_sec = 1; timeout.tv_usec = 0; std::thread t(testThread); while(1) { if(select(s_efd + 1, &rdset, NULL, NULL, &timeout) == 0) { std::cout << "timeout\n"; timeout.tv_sec = 1; timeout.tv_usec = 0; FD_SET(s_efd, &rdset); continue; } uint64_t one = 0; ssize_t n = read(s_efd, &one, sizeof one); if(n != sizeof one) { std::cout << " read " << n << " bytes instead of 8\n"; } std::cout << " wakeup !\n"; break; } t.join(); close(s_efd); return 0; }
./test.out createEventfd() fd : 3 timeout timeout timeout wakeup !
eventfd 单纯的使用文件描述符实现的线程间的通知机制,能够很好的融入select、poll、epoll的I/O复用机制中.
所增长的接口及成员:
typedef std::function<void()> Functor; void runInLoop(const Functor& cb); void wakeup(); //是写m_wakeupFd 通知poll 处理读事件. void queueInLoop(const Functor& cb); private: //used to waked up void handleRead(); void doPendingFunctors(); int m_wakeupFd; std::unique_ptr<Channel> p_wakeupChannel; mutable MutexLock m_mutex; bool m_callingPendingFunctors; /* atomic */ std::vector<Functor> m_pendingFunctors; // @GuardedBy mutex_
(runInLoop() -> quueInLoop())/queueInLoop() -> wakeup() -> poll() -> handleRead() -> doPendingFunctors()
runInLoop()
若是用户在当前IO线程调用这个函数, 回调会同步进行; 若是用户在其余线程调用runInLoop(),cb会被加入队列, IO线程会被唤醒来调用这个Functor.
void EventLoop::runInLoop(const Functor& cb) { if(isInloopThread()) cb(); else queueInLoop(cb); }
queueInLoop()
会将回调添加到容器,同时经过wakeup()唤醒poll()调用容器内的回调.
void EventLoop::queueInLoop(const Functor& cb) { LOG_TRACE << "EventLoop::queueInLoop()"; { MutexLockGuard lock(m_mutex); m_pendingFunctors.push_back(std::move(cb)); } if(!isInloopThread()) { wakeup(); } }
内部实现,
wakeup()
写已注册到poll的eventfd 通知poll 处理读事件.
// m_wakeupFd(createEventfd()), // p_wakeupChannel(new Channel(this, m_wakeupFd)), void EventLoop::wakeup() { uint64_t one = 1; ssize_t n = sockets::write(m_wakeupFd, &one, sizeof one); if(n != sizeof one) { LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8"; } }
handleRead()
poll回调读事件,处理eventfd.
void EventLoop::handleRead() //handle wakeup Fd { LOG_TRACE << "EventLoop::handleRead() handle wakeup Fd"; uint64_t one = 1; ssize_t n = sockets::read(m_wakeupFd, &one, sizeof one); if(n != sizeof one) { LOG_ERROR << "EventLoop::handleRead() reads " << n << "bytes instead of 8"; } doPendingFunctors(); }
doPendingFunctors()
处理挂起的事件.
void EventLoop::doPendingFunctors() { LOG_TRACE << "EventLoop::doPendingFunctors()"; std::vector<Functor> functors; m_callingPendingFunctors = true; { MutexLockGuard lock(m_mutex); functors.swap(m_pendingFunctors); } for(size_t i = 0; i < functors.size(); ++i) { functors[i](); } m_callingPendingFunctors = false; }
本文主要介绍了muduo中EventLoop经过 经过封装一层eventfd实现的runInLoop()函数,使得其余线程想往EventLoop所在的I/O线程注册任务成为可能.
下篇文章会写Connector和Acceptor,连接器和监听器 实现第一条连接。