sonic是一个网络操做系统,采用了大量的相互独立的第三方开源组件,这些组件在依赖,编译环境,库,配置方式都有很大的不一样。为了让这些组件在sonic中相互协做,互不干扰,同时尽可能不修改第三方组件的代码,sonic采用容器技术为各个组件提供独立的运行环境,经过容器间共享网络命名空间进行通讯。
各个第三组件有各自的配置文件格式和消息格式,如何让这些组件互通讯息了。sonic采用redis数据库做为消息传递平台,经过纯字符消息方式屏蔽各个组件的插件,经过胶水代码将其粘起来。c++
sonic经过redis数据库的发布-订阅机制和键空间事件机制实现了整个消息传递机制。redis
class TableBase { public: TableBase(int dbId, const std::string &tableName) : m_tableName(tableName), m_dbId(dbId) { /* Look up table separator for the provided DB */ auto it = tableNameSeparatorMap.find(dbId); if (it != tableNameSeparatorMap.end()) { m_tableSeparator = it->second; } else { SWSS_LOG_NOTICE("Unrecognized database ID. Using default table name separator ('%s')", TABLE_NAME_SEPARATOR_VBAR.c_str()); m_tableSeparator = TABLE_NAME_SEPARATOR_VBAR; } } std::string getTableName() const { return m_tableName; } int getDbId() const { return m_dbId; } /* Return the actual key name as a combination of tableName<table_separator>key */ std::string getKeyName(const std::string &key) { if (key == "") return m_tableName; else return m_tableName + m_tableSeparator + key; } /* Return the table name separator being used */ std::string getTableNameSeparator() const { return m_tableSeparator; } std::string getChannelName() { return m_tableName + "_CHANNEL"; } private: static const std::string TABLE_NAME_SEPARATOR_COLON; static const std::string TABLE_NAME_SEPARATOR_VBAR; static const TableNameSeparatorMap tableNameSeparatorMap; std::string m_tableName; std::string m_tableSeparator; int m_dbId; }; class TableEntryWritable { public: virtual ~TableEntryWritable() = default; /* Set an entry in the table */ virtual void set(const std::string &key, const std::vector<FieldValueTuple> &values, const std::string &op = "", const std::string &prefix = EMPTY_PREFIX) = 0; /* Delete an entry in the table */ virtual void del(const std::string &key, const std::string &op = "", const std::string &prefix = EMPTY_PREFIX) = 0; };
消费者响应生产者的事件,能够采用阻塞或者轮询的方式处理。sonic采用了异步事件通知机制(poll)进行处理。消费者类必须实现事件通知机制相关的接口。
该类对异步通知机制Selectable(select,poll等)进行了封装,集成该类的派生类能够加入异步事件机制,经过集成该类,消费者能够持续监听事件。shell
class RedisSelect : public Selectable { public: /* The database is already alive and kicking, no need for more than a second */ static constexpr unsigned int SUBSCRIBE_TIMEOUT = 1000; RedisSelect(int pri = 0);//调度优先级 int getFd() override; void readData() override; bool hasCachedData() override; bool initializedWithData() override; void updateAfterRead() override; /* Create a new redisContext, SELECT DB and SUBSCRIBE */ void subscribe(DBConnector* db, const std::string &channelName); /* PSUBSCRIBE */ void psubscribe(DBConnector* db, const std::string &channelName); void setQueueLength(long long int queueLength); protected: std::unique_ptr<DBConnector> m_subscribe; long long int m_queueLength;//接收的应答的个数,一个请求一个应答。 };
int RedisSelect::getFd() { return m_subscribe->getContext()->fd; }
void RedisSelect::readData() { redisReply *reply = nullptr; if (redisGetReply(m_subscribe->getContext(), reinterpret_cast<void**>(&reply)) != REDIS_OK) throw std::runtime_error("Unable to read redis reply"); freeReplyObject(reply); m_queueLength++;//事件加一次, reply = nullptr; int status; do { status = redisGetReplyFromReader(m_subscribe->getContext(), reinterpret_cast<void**>(&reply)); if(reply != nullptr && status == REDIS_OK) {//一个响应加一次,该值会大于最终处理的循环次数,形成空转,可是不加的话,极端状况下会形成丢失信息问题 m_queueLength++; freeReplyObject(reply); } } while(reply != nullptr && status == REDIS_OK); if (status != REDIS_OK) { throw std::runtime_error("Unable to read redis reply"); } }
bool RedisSelect::hasCachedData() {//判断是否还有消息,存在消息的话,加入m_ready,保证已经读出来的消息能被处理 return m_queueLength > 1; }
void RedisSelect::updateAfterRead() { m_queueLength--;//假设一次处理一个应答,这里减去1,即便一次处理了多个消息,依然只减掉1,形成空转的根本缘由 }
void RedisSelect::setQueueLength(long long int queueLength) { m_queueLength = queueLength;//设置消息个数,用于构造函数 }
/* Create a new redisContext, SELECT DB and SUBSCRIBE */ void RedisSelect::subscribe(DBConnector* db, const std::string &channelName) { m_subscribe.reset(db->newConnector(SUBSCRIBE_TIMEOUT)); /* Send SUBSCRIBE #channel command */ std::string s("SUBSCRIBE "); s += channelName; RedisReply r(m_subscribe.get(), s, REDIS_REPLY_ARRAY); } /* PSUBSCRIBE */ void RedisSelect::psubscribe(DBConnector* db, const std::string &channelName) { m_subscribe.reset(db->newConnector(SUBSCRIBE_TIMEOUT)); /* * Send PSUBSCRIBE #channel command on the * non-blocking subscriber DBConnector */ std::string s("PSUBSCRIBE "); s += channelName; RedisReply r(m_subscribe.get(), s, REDIS_REPLY_ARRAY); }
class TableEntryPoppable { public: virtual ~TableEntryPoppable() = default; /* Pop an action (set or del) on the table */ virtual void pop(KeyOpFieldsValuesTuple &kco, const std::string &prefix = EMPTY_PREFIX) = 0; /* Get multiple pop elements */ virtual void pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const std::string &prefix = EMPTY_PREFIX) = 0; }; class TableConsumable : public TableBase, public TableEntryPoppable, public RedisSelect { public: /* The default value of pop batch size is 128 */ static constexpr int DEFAULT_POP_BATCH_SIZE = 128;//一次消费128条消息 TableConsumable(int dbId, const std::string &tableName, int pri) : TableBase(dbId, tableName), RedisSelect(pri) { } };
EVAL script numkeys key [key ...] arg [arg ...] 首先你们必定要知道eval的语法格式,其中: <1> script: 你的lua脚本 <2> numkeys: key的个数 <3> key: redis中各类数据结构的替代符号 <4> arg: 你的自定义参数 ok,可能乍一看模板不是特别清楚,下面我能够用官网的小案例演示一下: eval "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 username age jack 20
上面这一串代码大概是什么意思呢? 第一个参数的字符串就是script,也就是lua脚本。2表示keys的个数,KEYS[1] 就是 username的占位符, KEYS[2]就是age的占位符,ARGV[1]就是jack的占位符,ARGV[2]就是20的占位符,,以此类推,,,因此最后的结果应该就是:{return username age jack 20} 是否是有点像C#中的占位符:{0}呢?下面我在Redis中给你们演示一下:数据库
admin@admin:~$ redis-cli 127.0.0.1:6379> eval "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 username age jack 20 1) "username" 2) "age" 3) "jack" 4) "20" 127.0.0.1:6379>
而后咱们经过下面命令执行,这种方式和前面介绍的不同,参数 --eval script key1 key2 , arg1 age2 这种模式,key和value用一个逗号隔开就行了,最后咱们也看到了,数据都出来了,对吧.网络
admin@admin:~$ redis-cli --eval t.lua username age , jack 20 1) "username" 2) "age" 3) "jack" 4) "20" admin@admin:~$ 注意上面的逗号左右两边都有空格
admin@admin:~$ redis-cli script load "$(cat t.lua)" "a42059b356c875f0717db19a51f6aaca9ae659ea" admin@admin:~$ admin@admin:~$ redis-cli 127.0.0.1:6379> EVALSHA a42059b356c875f0717db19a51f6aaca9ae659ea 2 username age jack 20 1) "username" 2) "age" 3) "jack" 4) "20" 127.0.0.1:6379>
admin@admin:~$ cat flashsale.lua local buyNum = ARGV[1] -- 本次购买的数量 local goodsKey = KEYS[1] -- 本次购买的商品名 local goodsNum = redis.call('get',goodsKey) -- 获取商品的库存个数 if tonumber(goodsNum) >= tonumber(buyNum) -- 库存足够,那么出货 then redis.call('decrby',goodsKey,buyNum) -- 减小本次买的量 return buyNum -- 返回购买的量 else return '0' -- 数量不够,直接返回0 end admin@admin:~$ admin@admin:~$ redis-cli --eval flashsale.lua "pets" , 8 "8" admin@admin:~$ 上面脚本实现的是一个简单的秒杀程序