常见的服务器模型有多进程模型,多线程,IO多路复用,协程等模型。sonic的核心守护进程orchagent采用的是IO多路复用模型,早期的sonic采用的是select实现多路复用,后面的版本采用的是epoll。使用select(跟多路复用的select名字同样)类对底层进行了封装,屏蔽了差别。c++
事件基类,描述了epoll事件,能够是读,写,异常等事件。该结构对通用epoll事件进行了封装,真实事件经过该类派生出来,好比redis数据库事件:class RedisSelect : public Selectable;netlink事件:class NetLink : public Selectable;通知:class NotificationConsumer : public Selectable,orch执行单元:class Executor : public Selectable,定时器:class SelectableTimer : public Selectable等。redis
class Selectable { public: Selectable(int pri = 0) : m_priority(pri), m_last_used_time(std::chrono::steady_clock::now()) { lastusedsequence = g_lastusedsequence++;} virtual ~Selectable() = default; /* return file handler for the Selectable */ virtual int getFd() = 0; /* Read all data from the fd assicaited with Selectable */ virtual void readData() = 0; /* true if Selectable has data in its cache */ // 判断是否还有数据,若是有放入就绪事件set virtual bool hasCachedData() { return false; } /* true if Selectable was initialized with data */ // 判断是否须要读取初始数据 virtual bool initializedWithData() { return false; } /* run this function after every read */ // 更新事件数 virtual void updateAfterRead() { } int getPri() const { return m_priority; } private: friend class Select;//友元类为Select // only Select class can access and update m_last_used_time std::chrono::time_point<std::chrono::steady_clock> getLastUsedTime() const { return m_last_used_time; } // 最后使用序列号 unsigned long getLastUsedsequence() const { return lastusedsequence; } // 跟新最后使用序列号 void updateLastUsedTime() { m_last_used_time = std::chrono::steady_clock::now(); lastusedsequence = g_lastusedsequence++; } // 优先级,实现基于优先级调度 int m_priority; // defines priority of Selectable inside Select // higher value is higher priority std::chrono::time_point<std::chrono::steady_clock> m_last_used_time; unsigned long lastusedsequence;//上次使用序列号 static unsigned long g_lastusedsequence;//全局基准序列号,用于对同优先级业务进行公平调度 };
class Select { public: Select(); ~Select(); /* Add object for select 给epoll添加一个事件 */ void addSelectable(Selectable *selectable); /* Remove object from select 删除一个epoll事件 */ void removeSelectable(Selectable *selectable); /* Add multiple objects for select 添加多个epoll事件 */ void addSelectables(std::vector<Selectable *> selectables); enum {//返回的事件类型 OBJECT = 0, ERROR = 1, TIMEOUT = 2, }; //执行epoll int select(Selectable **c, unsigned int timeout = std::numeric_limits<unsigned int>::max()); int select(std::vector<Selectable *> &vc, unsigned int timeout = std::numeric_limits<unsigned int>::max()); private: //epoll事件比较函数,经过该函数实现事件的优先级 struct cmp { bool operator()(const Selectable* a, const Selectable* b) const { /* Choose Selectable with highest priority first */ if (a->getPri() > b->getPri()) return true; else if (a->getPri() < b->getPri()) return false; /* if the priorities are equal */ /* use Selectable which was selected later */ if (a->getLastUsedsequence() < b->getLastUsedsequence()) return true; else if (a->getLastUsedsequence() > b->getLastUsedsequence()) return false; /* when a == b */ return false; } }; //epoll轮询函数 int poll_descriptors(Selectable **c, unsigned int timeout); int poll_descriptors(std::vector<Selectable *> &vc, unsigned int timeout); int m_epoll_fd;//epoll句柄 std::unordered_map<int, Selectable *> m_objects;//监听的事件句柄与其对应的selectable之间的关系 std::set<Selectable *, Select::cmp> m_ready;//已经就绪的事件集合,提供了比较函数,从而实现优先级调度 };
Select::Select() { m_epoll_fd = ::epoll_create1(0);//建立epoll句柄 if (m_epoll_fd == -1) { std::string error = std::string("Select::constructor:epoll_create1: error=(" + std::to_string(errno) + "}:" + strerror(errno)); throw std::runtime_error(error); } }
Select::~Select() { (void)::close(m_epoll_fd); }
void Select::addSelectable(Selectable *selectable) { const int fd = selectable->getFd(); if(m_objects.find(fd) != m_objects.end())//已经添加了该事件,退出 { SWSS_LOG_WARN("Selectable is already added to the list, ignoring."); return; } m_objects[fd] = selectable; if (selectable->initializedWithData())//是否已经有数据可读,读出已有的数据 { m_ready.insert(selectable); } //添加可读事件 struct epoll_event ev = { .events = EPOLLIN, .data = { .fd = fd, }, }; int res = ::epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, fd, &ev); if (res == -1) { std::string error = std::string("Select::add_fd:epoll_ctl: error=(" + std::to_string(errno) + "}:" + strerror(errno)); throw std::runtime_error(error); } }
void Select::removeSelectable(Selectable *selectable) { const int fd = selectable->getFd(); m_objects.erase(fd); m_ready.erase(selectable); //从epoll中删除事件 int res = ::epoll_ctl(m_epoll_fd, EPOLL_CTL_DEL, fd, NULL); if (res == -1) { std::string error = std::string("Select::del_fd:epoll_ctl: error=(" + std::to_string(errno) + "}:" + strerror(errno)); throw std::runtime_error(error); } }
void Select::addSelectables(vector<Selectable *> selectables) { for(auto it : selectables)//添加多个事件 { addSelectable(it); } }
提取一个就绪事件数据库
int Select::poll_descriptors(Selectable **c, unsigned int timeout) { int sz_selectables = static_cast<int>(m_objects.size()); std::vector<struct epoll_event> events(sz_selectables); int ret; //阻塞等待事件发生,发生错误或者被中断打断则继续监听,发生事件则执行事件 do { ret = ::epoll_wait(m_epoll_fd, events.data(), sz_selectables, timeout); } while(ret == -1 && errno == EINTR); // Retry the select if the process was interrupted by a signal if (ret < 0) return Select::ERROR; //遍历每个发生的事件 for (int i = 0; i < ret; ++i) { int fd = events[i].data.fd; Selectable* sel = m_objects[fd];//获取事件描述符 sel->readData();//读取数据 m_ready.insert(sel);//插入就绪集合 } //存在就绪事件 if (!m_ready.empty()) { auto sel = *m_ready.begin(); *c = sel; m_ready.erase(sel); // we must update clock only when the selector out of the m_ready // otherwise we break invariant of the m_ready // 更新该事件的使用时间,使用事件会做为事件优先级进行使用,越频繁的优先级越低,从而避免同优先级的事件 // 饿死 sel->updateLastUsedTime(); // 有数据,依然放入已经就绪集合 if (sel->hasCachedData()) { // reinsert Selectable back to the m_ready set, when there're more messages in the cache m_ready.insert(sel); } // 对数据进行刷新,若是该句柄只发生了一次事件,那么这里会进行减1,下次m_ready中将不会存在该sel。 // 仔细分析了sonic的selectable的实现,这里是有bug的,会形成大量的空转。 sel->updateAfterRead(); return Select::OBJECT; } return Select::TIMEOUT; }
提取多个就绪事件,该函数是在上面的函数的基础上的改进。只提取一个事件将会形成"饿死和胀死"的问题。因为m_ready是有序队列,对于高优先的事件老是会被优先提取,若是高优先级的事件依赖于低优先级事件的话,会形成高优先级的业务一直被调度,可是缺乏依赖条件而不能执行业务,低优先级业务老是得不到调度,造成死锁问题。同时提取全部就绪事件能够解决高低优先级死锁问题。服务器
int Select::poll_descriptors(std::vector<Selectable *> &vc, unsigned int timeout) { int sz_selectables = static_cast<int>(m_objects.size()); std::vector<struct epoll_event> events(sz_selectables); int ret; do { ret = ::epoll_wait(m_epoll_fd, events.data(), sz_selectables, timeout); } while(ret == -1 && errno == EINTR); // Retry the select if the process was interrupted by a signal if (ret < 0) return Select::ERROR; for (int i = 0; i < ret; ++i) { int fd = events[i].data.fd; Selectable* sel = m_objects[fd]; sel->readData(); m_ready.insert(sel); } auto iter = m_ready.begin(); while(iter !=m_ready.end()) { auto sel = *iter; vc.push_back(sel); iter = m_ready.erase(iter); sel->updateLastUsedTime(); } for(auto se:vc) { if (se->hasCachedData()) { m_ready.insert(se); } se->updateAfterRead(); } if(!vc.empty()) { return Select::OBJECT; } return Select::TIMEOUT; }
int Select::select(Selectable **c, unsigned int timeout) { SWSS_LOG_ENTER(); int ret; *c = NULL; if (timeout == numeric_limits<unsigned int>::max()) timeout = -1; /* check if we have some data */ ret = poll_descriptors(c, 0); /* return if we have data, we have an error or desired timeout was 0 */ if (ret != Select::TIMEOUT || timeout == 0) return ret; /* wait for data */ ret = poll_descriptors(c, timeout); return ret; }
int Select::select(std::vector<Selectable *> &vc, unsigned int timeout) { SWSS_LOG_ENTER(); int ret; if (timeout == numeric_limits<unsigned int>::max()) timeout = -1; /* check if we have some data */ ret = poll_descriptors(vc, 0); /* return if we have data, we have an error or desired timeout was 0 */ if (ret != Select::TIMEOUT || timeout == 0) return ret; /* wait for data */ ret = poll_descriptors(vc, timeout); return ret; }