sonic orchagent线程的调度最小单位是Consumer。Consumer是在epoll事件Selectable的基础上的进一步封装,每一次发生epoll事件会触发orchagent进行一次调度。orch是资源的集合,一个orch能够包含多个Consumer,好比acl orch会监听多个redistable。c++
// Design assumption // 1. one Orch can have one or more Executor // 2. one Executor must belong to one and only one Orch // 3. Executor will hold an pointer to new-ed selectable, and delete it during dtor // 设计假设: // 1. 一个orch能够拥有一个或者多个Executor // 2. 一个Executor必须属于一个orch并且仅仅属于一个orch // 3. Executor有一个指针指向一个new出来的Selectable结构,必须在析构函数中将其删除,不然会泄漏 class Executor : public Selectable { public: Executor(Selectable *selectable, Orch *orch) : m_selectable(selectable) , m_orch(orch) { } virtual ~Executor() { delete m_selectable; } // Decorating Selectable int getFd() override { return m_selectable->getFd(); } void readData() override { m_selectable->readData(); } bool hasCachedData() override { return m_selectable->hasCachedData(); } bool initializedWithData() override { return m_selectable->initializedWithData(); } void updateAfterRead() override { m_selectable->updateAfterRead(); } Orch * getorch() { return m_orch; } // Disable copying Executor(const Executor&) = delete; Executor& operator=(const Executor&) = delete; // Execute on event happening // execute执行事件,drain是一个辅助函数 virtual void execute() { } virtual void drain() { } protected: Selectable *m_selectable;//指向new出来的Selectable Orch *m_orch;//指向一个orch // Get the underlying selectable 获取指向的Selectable Selectable *getSelectable() const { return m_selectable; } };
class Executor只是一个中间的派生类,orch直接使用的是class Consumer和class ExecutableTimer。redis
消费者类通常用于处理app_db的订阅事件,对于asic_db通常是处理syncd的应答事件。数据库
typedef std::pair<std::string, std::string> FieldValueTuple; #define fvField std::get<0> #define fvValue std::get<1> typedef std::tuple<std::string, std::string, std::vector<FieldValueTuple> > KeyOpFieldsValuesTuple; #define kfvKey std::get<0> #define kfvOp std::get<1> #define kfvFieldsValues std::get<2> typedef map<string, KeyOpFieldsValuesTuple> SyncMap; class Consumer : public Executor { public: Consumer(TableConsumable *select, Orch *orch) : Executor(select, orch) { } TableConsumable *getConsumerTable() const { return static_cast<TableConsumable *>(getSelectable()); } string getTableName() const { return getConsumerTable()->getTableName(); } // 事物执行 void execute(); void drain(); /* Store the latest 'golden' status */ // TODO: hide? SyncMap m_toSync; };
epoll事件触发后,须要调用该函数从数据库中读取出指定key的内容,将其加工后存放在m_toSync中,供后续处理。json
void Consumer::execute() { SWSS_LOG_ENTER(); std::deque<KeyOpFieldsValuesTuple> entries; //调用pops函数,从redis数据库中读取数据,返回KeyOpFieldsValuesTuple结构 getConsumerTable()->pops(entries); /* Nothing popped */ if (entries.empty()) { return; } // 遍历每个事件 for (auto& entry: entries) { string key = kfvKey(entry); string op = kfvOp(entry); /* Record incoming tasks 记录事件 */ if (gSwssRecord) { Orch::recordTuple(*this, entry); } /* If a new task comes or if a DEL task comes, we directly put it into getConsumerTable().m_toSync map */ // 在这里进行一次合并,对于删除事件,直接覆盖 if (m_toSync.find(key) == m_toSync.end() || op == DEL_COMMAND) { m_toSync[key] = entry; } /* If an old task is still there, we combine the old task with new task */ /* */ else { KeyOpFieldsValuesTuple existing_data = m_toSync[key]; auto new_values = kfvFieldsValues(entry); auto existing_values = kfvFieldsValues(existing_data); //遍历每个新的值 for (auto it : new_values) { string field = fvField(it); string value = fvValue(it); auto iu = existing_values.begin(); while (iu != existing_values.end())//遍历每个旧的值 { string ofield = fvField(*iu); if (field == ofield)//相同的域,将老的值覆盖,这里应该跳出while,代码效率较差 iu = existing_values.erase(iu); else iu++; } /* 将新的值添加进去 */ existing_values.push_back(FieldValueTuple(field, value)); } m_toSync[key] = KeyOpFieldsValuesTuple(key, op, existing_values); } } //执行全部整理好的任务。 drain(); }
假设有一个task的键值对以下:app
key=test;op=set;value={ A:a, B:b, C:c, }
第一次触发任务是在APP_DB中写入了:ide
key=test;op=set;value={ A:a, B:b }
加入orchagent只是将该任务读取到了m_toSync中,因为某种缘由没有执行完该任务,依然驻留在m_toSync中。第二次写入了:函数
key=test;op=set;value={ B:b1, C:c }
那么通过execute函数后m_toSync中将会是:ui
key=test;op=set;value={ A:a, B:b1, C:c }
执行m_toSync中的任务。this
void Consumer::drain() { if (!m_toSync.empty()) m_orch->doTask(*this); }
class Orch { public: //每一个orch都会链接到数据库,以及其须要订阅的表名,和订阅该表产生的事件的优先级 //以默认优先级订阅一个table Orch(DBConnector *db, const string tableName, int pri = default_orch_pri); //以默认优先级订阅多个table Orch(DBConnector *db, const vector<string> &tableNames); //订阅多个table,指明每一个table的优先级 Orch(DBConnector *db, const vector<table_name_with_pri_t> &tableNameWithPri); //链接多个数据库 Orch(const vector<TableConnector>& tables); virtual ~Orch(); // 获取该orch的全部epoll事件 vector<Selectable*> getSelectables(); /* Iterate all consumers in m_consumerMap and run doTask(Consumer) */ // 执行该orch中全部的consumers中的m_sync中的任务 void doTask(); /* Run doTask against a specific executor */ // 任务的来源能够是consumer,NotificationConsumer,SelectableTimer virtual void doTask(Consumer &consumer) = 0; virtual void doTask(NotificationConsumer &consumer) { } virtual void doTask(SelectableTimer &timer) { } /* TODO: refactor recording */ static void recordTuple(Consumer &consumer, KeyOpFieldsValuesTuple &tuple); protected: // 消费者map,一个orch能够订阅多个table,key为tableName,value为Executor ConsumerMap m_consumerMap; // 与调试相关 static void logfileReopen(); string dumpTuple(Consumer &consumer, KeyOpFieldsValuesTuple &tuple); ref_resolve_status resolveFieldRefValue(type_map&, const string&, KeyOpFieldsValuesTuple&, sai_object_id_t&); bool parseIndexRange(const string &input, sai_uint32_t &range_low, sai_uint32_t &range_high); bool parseReference(type_map &type_maps, string &ref, string &table_name, string &object_name); ref_resolve_status resolveFieldRefArray(type_map&, const string&, KeyOpFieldsValuesTuple&, vector<sai_object_id_t>&); /* Note: consumer will be owned by this class */ // 内部函数添加一个Executor,给addConsumer使用 void addExecutor(string executorName, Executor* executor); Executor *getExecutor(string executorName); private: // 添加一个消费者 void addConsumer(DBConnector *db, string tableName, int pri = default_orch_pri); };
void Orch::addExecutor(string executorName, Executor* executor) { m_consumerMap.emplace(std::piecewise_construct, std::forward_as_tuple(executorName), std::forward_as_tuple(executor)); } //添加一个消费者 void Orch::addConsumer(DBConnector *db, string tableName, int pri) { if (db->getDbId() == CONFIG_DB || db->getDbId() == STATE_DB) { addExecutor(tableName, new Consumer(new SubscriberStateTable(db, tableName, TableConsumable::DEFAULT_POP_BATCH_SIZE, pri), this)); } else { addExecutor(tableName, new Consumer(new ConsumerStateTable(db, tableName, gBatchSize, pri), this)); } }
执行本orch中的每个消费者m_toSync中的task,无论该task是否本次从redis中读取仍是之前未处理完毕的。线程
void Orch::doTask() { for(auto &it : m_consumerMap) { it.second->drain(); } }
orch2是在orch的基础上的一个封装,代码的可读性加强。
class Orch2 : public Orch { public: Orch2(DBConnector *db, const std::string& tableName, Request& request, int pri=default_orch_pri) : Orch(db, tableName, pri), request_(request) { } protected: virtual void doTask(Consumer& consumer); virtual bool addOperation(const Request& request)=0; virtual bool delOperation(const Request& request)=0; private: Request& request_; };
void Orch2::doTask(Consumer &consumer) { SWSS_LOG_ENTER(); auto it = consumer.m_toSync.begin(); while (it != consumer.m_toSync.end()) { bool erase_from_queue = true; try { request_.parse(it->second); auto op = request_.getOperation(); if (op == SET_COMMAND) { erase_from_queue = addOperation(request_); } else if (op == DEL_COMMAND) { erase_from_queue = delOperation(request_); } else { SWSS_LOG_ERROR("Wrong operation. Check RequestParser: %s", op.c_str()); } } catch (const std::invalid_argument& e) { SWSS_LOG_ERROR("Parse error: %s", e.what()); } catch (const std::logic_error& e) { SWSS_LOG_ERROR("Logic error: %s", e.what()); } catch (const std::exception& e) { SWSS_LOG_ERROR("Exception was catched in the request parser: %s", e.what()); } catch (...) { SWSS_LOG_ERROR("Unknown exception was catched in the request parser"); } request_.clear(); //执行成功,那么从m_tosync中删除,不然执行下一个task if (erase_from_queue) { it = consumer.m_toSync.erase(it); } else { ++it; } } }