本章节主要分析sonic使用redis的键空间消息机制实现的消息传递框架,该机制区别于发布-订阅机制在于发布者不须要进行pubulish通知,只要往数据库中写入指定的键,redis就会通知监听了该键空间的客户端。该机制目前只用于监听config_db,用于监听config的变化。而后将其同步到app_db。使用该机制的案例有:VlanMgr,IntfMgr,portsyncd等,能够经过orch包装使用,好比VlanMgr;也能够直接定义SubscriberStateTable表,好比portCfg。c++
#在数据库0中订阅以tom开头的键 127.0.0.1:6379> PSUBSCRIBE __keyspace@0__:tom* Reading messages... (press Ctrl-C to quit) 1) "psubscribe" 2) "__keyspace@0__:tom*" 3) (integer) 1 #在数据库0中添加hash表tom 127.0.0.1:6379> HMSET tom|1 name tom age 28 OK #订阅者获得应答 1) "pmessage" 2) "__keyspace@0__:tom*" 3) "__keyspace@0__:tom|1" 4) "hset" #删除key 127.0.0.1:6379> DEL tom|1 (integer) 1 127.0.0.1:6379> 1) "pmessage" 2) "__keyspace@0__:tom*" 3) "__keyspace@0__:tom|1" 4) "del"
class SubscriberStateTable : public ConsumerTableBase { public: SubscriberStateTable(DBConnector *db, const std::string &tableName, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0); /* Get all elements available */ void pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const std::string &prefix = EMPTY_PREFIX); /* Read keyspace event from redis */ void readData() override; bool hasCachedData() override; bool initializedWithData() override { return !m_buffer.empty(); } private: /* Pop keyspace event from event buffer. Caller should free resources. */ std::shared_ptr<RedisReply> popEventBuffer(); std::string m_keyspace; //全部应答存储在该队列中 std::deque<std::shared_ptr<RedisReply>> m_keyspace_event_buffer; Table m_table;//很是重要的一个成员,具体的 };
class Table : public TableBase, public TableEntryEnumerable { public: Table(DBConnector *db, const std::string &tableName); Table(RedisPipeline *pipeline, const std::string &tableName, bool buffered); ~Table() override; /* Set an entry in the DB directly (op not in use) */ virtual void set(const std::string &key, const std::vector<FieldValueTuple> &values, const std::string &op = "", const std::string &prefix = EMPTY_PREFIX); /* Delete an entry in the table */ virtual void del(const std::string &key, const std::string &op = "", const std::string &prefix = EMPTY_PREFIX); /* Read a value from the DB directly */ /* Returns false if the key doesn't exists */ virtual bool get(const std::string &key, std::vector<FieldValueTuple> &ovalues); void getKeys(std::vector<std::string> &keys); void setBuffered(bool buffered); void flush(); void dump(TableDump &tableDump); protected: bool m_buffered; bool m_pipeowned; RedisPipeline *m_pipe; /* Strip special symbols from keys used for type identification * Input example: * port@ * DB entry: * 1) "ports@" * 2) "Ethernet0,Ethernet4,... * */ std::string stripSpecialSym(const std::string &key); };
Table::Table(RedisPipeline *pipeline, const string &tableName, bool buffered) : TableBase(pipeline->getDbId(), tableName) , m_buffered(buffered) , m_pipeowned(false) , m_pipe(pipeline) { }
bool Table::get(const string &key, vector<FieldValueTuple> &values) { /* 127.0.0.1:6379[4]> HGETALL "VLAN|Vlan1000" 1) "vlanid" 2) "1000" 127.0.0.1:6379[4]> */ RedisCommand hgetall_key; hgetall_key.format("HGETALL %s", getKeyName(key).c_str()); RedisReply r = m_pipe->push(hgetall_key, REDIS_REPLY_ARRAY); redisReply *reply = r.getContext(); values.clear(); if (!reply->elements) return false; if (reply->elements & 1)//必须是偶数,键值对 throw system_error(make_error_code(errc::address_not_available), "Unable to connect netlink socket"); //整理键值对 for (unsigned int i = 0; i < reply->elements; i += 2) { values.emplace_back(stripSpecialSym(reply->element[i]->str), reply->element[i + 1]->str); } return true; }
SubscriberStateTable::SubscriberStateTable(DBConnector *db, const string &tableName, int popBatchSize, int pri) : ConsumerTableBase(db, tableName, popBatchSize, pri), m_table(db, tableName) { //键空间 m_keyspace = "__keyspace@"; m_keyspace += to_string(db->getDbId()) + "__:" + tableName + m_table.getTableNameSeparator() + "*"; //订阅键空间事件 psubscribe(m_db, m_keyspace); vector<string> keys; m_table.getKeys(keys); for (const auto &key: keys) { KeyOpFieldsValuesTuple kco; kfvKey(kco) = key; kfvOp(kco) = SET_COMMAND; if (!m_table.get(key, kfvFieldsValues(kco))) { continue; } m_buffer.push_back(kco); } }
该订阅者实现了本身的数据读取函数redis
void SubscriberStateTable::readData() { redisReply *reply = nullptr; /* Read data from redis. This call is non blocking. This method * is called from Select framework when data is available in socket. * NOTE: All data should be stored in event buffer. It won't be possible to * read them second time. */ if (redisGetReply(m_subscribe->getContext(), reinterpret_cast<void**>(&reply)) != REDIS_OK) { throw std::runtime_error("Unable to read redis reply"); } //将应答压入键空间事件缓存中 m_keyspace_event_buffer.push_back(shared_ptr<RedisReply>(make_shared<RedisReply>(reply))); /* Try to read data from redis cacher. * If data exists put it to event buffer. * NOTE: Keyspace event is not persistent and it won't * be possible to read it second time. If it is not stared in * the buffer it will be lost. */ //循环获取全部应答 reply = nullptr; int status; do { status = redisGetReplyFromReader(m_subscribe->getContext(), reinterpret_cast<void**>(&reply)); if(reply != nullptr && status == REDIS_OK) { m_keyspace_event_buffer.push_back(shared_ptr<RedisReply>(make_shared<RedisReply>(reply))); } } while(reply != nullptr && status == REDIS_OK); if (status != REDIS_OK) { throw std::runtime_error("Unable to read redis reply"); } }
该类订阅者本身实现了判断是否还有数据,只要大于1,则认为还有数据,相比之下比默认的接口更优。shell
bool SubscriberStateTable::hasCachedData() { return m_buffer.size() > 1 || m_keyspace_event_buffer.size() > 1; }
该类订阅者,指望的数据即在订阅事件的返回应答中,应答中只是key和事件类型。若是不是del的话,须要根据具体的事件进行数据库读取。数据库
shared_ptr<RedisReply> SubscriberStateTable::popEventBuffer() { if (m_keyspace_event_buffer.empty()) { return NULL; } auto reply = m_keyspace_event_buffer.front(); m_keyspace_event_buffer.pop_front(); return reply; } void SubscriberStateTable::pops(deque<KeyOpFieldsValuesTuple> &vkco, const string& /*prefix*/) { vkco.clear(); if (!m_buffer.empty())//不为空,则将其中的内容拷贝出来 { vkco.insert(vkco.end(), m_buffer.begin(), m_buffer.end()); m_buffer.clear(); return; } while (auto event = popEventBuffer())//提取信息 { KeyOpFieldsValuesTuple kco; /* if the Key-space notification is empty, try next one. */ if (event->getContext()->type == REDIS_REPLY_NIL) { continue; } assert(event->getContext()->type == REDIS_REPLY_ARRAY); size_t n = event->getContext()->elements; /* Expecting 4 elements for each keyspace pmessage notification */ //键空间的应答通常包含四个消息 if (n != 4) { SWSS_LOG_ERROR("invalid number of elements %lu for pmessage of %s", n, m_keyspace.c_str()); continue; } /* The second element should be the original pattern matched */ /* 第二个是命中的模式 */ auto ctx = event->getContext()->element[1]; if (m_keyspace != ctx->str) { SWSS_LOG_ERROR("invalid pattern %s returned for pmessage of %s", ctx->str, m_keyspace.c_str()); continue; } //第三个包含命中的key,冒号后面就是key ctx = event->getContext()->element[2]; string msg(ctx->str); size_t pos = msg.find(':'); if (pos == msg.npos) { SWSS_LOG_ERROR("invalid format %s returned for pmessage of %s", ctx->str, m_keyspace.c_str()); continue; } //冒号后面就是key,好比"VLAN_INTERFACE|Vlan1000|192.168.0.1/27" string table_entry = msg.substr(pos + 1); //获取分割符号,分隔符前面是表名 pos = table_entry.find(m_table.getTableNameSeparator()); if (pos == table_entry.npos) { SWSS_LOG_ERROR("invalid key %s returned for pmessage of %s", ctx->str, m_keyspace.c_str()); continue; } string key = table_entry.substr(pos + 1); //最后一个是操做 ctx = event->getContext()->element[3]; if (strcmp("del", ctx->str) == 0) { kfvKey(kco) = key; kfvOp(kco) = DEL_COMMAND; } else { //执行get操做 if (!m_table.get(key, kfvFieldsValues(kco))) { SWSS_LOG_ERROR("Failed to get content for table key %s", table_entry.c_str()); continue; } kfvKey(kco) = key; kfvOp(kco) = SET_COMMAND; } vkco.push_back(kco); } m_keyspace_event_buffer.clear(); return; }