sonic核心守护线程orchagent以orch为单位进行资源管理,一个orch包含了一组类似的资源;orchagent调度系统以Executor为调度单位,调度实体有Consumer,ExecutableTimer等,本文分析一下sonic调度细节。c++
orchagent以OrchDaemon做为核心类进行描述。redis
class OrchDaemon { public: OrchDaemon(DBConnector *, DBConnector *, DBConnector *); ~OrchDaemon(); bool init();//初始化进程 void start();//启动调度系统 private: //链接了三个数据库 DBConnector *m_applDb; DBConnector *m_configDb; DBConnector *m_stateDb; //包含全部的orch std::vector<Orch *> m_orchList; //建立的select多路异步IO控制块 Select *m_select; //将asic_db的pipe进行flush,不在等待。 void flush(); };
初始化orchagent执行环境。数据库
bool OrchDaemon::init() { SWSS_LOG_ENTER(); ...... //链接数据库 TableConnector confDbAclTable(m_configDb, CFG_ACL_TABLE_NAME); TableConnector confDbAclRuleTable(m_configDb, CFG_ACL_RULE_TABLE_NAME); TableConnector stateDbLagTable(m_stateDb, STATE_LAG_TABLE_NAME); vector<TableConnector> acl_table_connectors = { confDbAclTable, confDbAclRuleTable, stateDbLagTable }; ...... //收集对应的orch m_orchList = { switch_orch, gCrmOrch, gBufferOrch, gPortsOrch, intfs_orch, gNeighOrch, gRouteOrch, copp_orch, tunnel_decap_orch, qos_orch, mirror_orch, gAclOrch, gFdbOrch, vrf_orch }; //建立多路事件控制块 m_select = new Select(); ...... //除了上面的orch外,其它代码还添加了一些orch,这里再也不列出 return true; }
将asic_db的生产者的pipeline清空json
/* Flush redis through sairedis interface */ void OrchDaemon::flush() { SWSS_LOG_ENTER(); sai_attribute_t attr; attr.id = SAI_REDIS_SWITCH_ATTR_FLUSH; sai_status_t status = sai_switch_api->set_switch_attribute(gSwitchId, &attr); if (status != SAI_STATUS_SUCCESS) { SWSS_LOG_ERROR("Failed to flush redis pipeline %d", status); exit(EXIT_FAILURE); } }
void OrchDaemon::start() { SWSS_LOG_ENTER(); //遍历每个orch,将每个orch中的全部关心的事件加入epoll中,基本一个Executor为一个事件 for (Orch *o : m_orchList) { m_select->addSelectables(o->getSelectables()); } //进入dead loop while (true) { Selectable *s; int ret; // 进行epoll阻塞监听 ret = m_select->select(&s, SELECT_TIMEOUT); // 错误事件,继续监听 if (ret == Select::ERROR) { SWSS_LOG_NOTICE("Error: %s!\n", strerror(errno)); continue; } //超时事件 if (ret == Select::TIMEOUT) { /* Let sairedis to flush all SAI function call to ASIC DB. * Normally the redis pipeline will flush when enough request * accumulated. Still it is possible that small amount of * requests live in it. When the daemon has nothing to do, it * is a good chance to flush the pipeline * 确保redis-pipeline中的少许请求在10秒后可以获得处理,不在积累请求 */ flush(); continue; } //获取触发epoll事件的Executor auto *c = (Executor *)s; //对于Consumer来讲,执行数据库操做,将redis中的通知转换到m_tosync中,而且执行如下m_tosync中的task c->execute(); /* After each iteration, periodically check all m_toSync map to * execute all the remaining tasks that need to be retried. */ /* TODO: Abstract Orch class to have a specific todo list */ // 执行其它的orch中遗留的任务 for (Orch *o : m_orchList) o->doTask(); } }
下面咱们以orchagent处理的最多的事件:ConsumerStateTable来分析一下orchagent调度系统。api
ConsumerStateTable即orchagent订阅的app_db事件。格式以下所示:app
"SADD" "INTF_TABLE_KEY_SET" "PortChannel1:1.1.1.1/8" #在集合INTF_TABLE_KEY_SET中增长一个key "HSET" "INTF_TABLE:PortChannel1:1.1.1.1/8" "scope" "global" "HSET" "INTF_TABLE:PortChannel1:1.1.1.1/8" "family" "IPv4" "PUBLISH" "INTF_TABLE_CHANNEL" "G"
1) "message" 2) "INTF_TABLE_CHANNEL" 3) "G"
local ret = {} local keys = redis.call('SPOP', KEYS[1], ARGV[1])--一次处理128个key local n = table.getn(keys) for i = 1, n do local key = keys[i] local values = redis.call('HGETALL', KEYS[2] .. key) table.insert(ret, {key, values}) end return ret
脚本返回的内容将会是:异步
INTF_TABLE:PortChannel1:1.1.1.1/8:{ "scope":"global", "family": "IPv4" }
pops而后对上面的内容进行加工成以下格式:函数
std::tuple<std::string, std::string, std::vector<FieldValueTuple> >oop
即最后返回:lua
"SET", "INTF_TABLE:PortChannel1:1.1.1.1/8", <"scope":"global","family": "IPv4">
sonic调度系统过于简单,没法处理大规模,逻辑复杂的业务,效率很是低下。