redis 订阅与发布

Reference: https://redisbook.readthedocs.io/en/latest/feature/pubsub.htmlhtml

Redis 的 SUBSCRIBE 命令可让客户端订阅任意数量的频道, 每当有新信息发送到被订阅的频道时, 信息就会被发送给全部订阅指定频道的客户端。node

做为例子, 下图展现了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系:python

digraph pubsub_relation {

    rankdir = BT;

    node [style = filled];

    edge [style = bold];

    channel1 [label = "channel1", fillcolor = "#A8E270"];

    node [shape = box, fillcolor = "#95BBE3"];

    client2 [label = "client2"];
    client5 [label = "client5"];
    client1 [label = "client1"];

    client2 -> channel1 [label = "subscribe"];
    client5 -> channel1 [label = "subscribe"];
    client1 -> channel1 [label = "subscribe"];
}

当有新消息经过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:redis

digraph send_message_to_subscriber {
    
    node [style = filled];

    edge [style = "dashed, bold"];
    
    message [label = "PUBLISH channel1 message", shape = plaintext, fillcolor = "#FADCAD"];

    message -> channel1 [color = "#B22222]"];

    channel1 [label = "channel1", fillcolor = "#A8E270"];

    node [shape = box];

    client2 [label = "client2", fillcolor = "#95BBE3"];
    client5 [label = "client5", fillcolor = "#95BBE3"];
    client1 [label = "client1", fillcolor = "#95BBE3"];

    /*
    client2 -> channel1 [label = "subscribe"];
    client5 -> channel1 [label = "subscribe"];
    client1 -> channel1 [label = "subscribe"];
    */

    channel1 -> client2 [label = "message", color = "#B22222"];
    channel1 -> client5 [label = "message", color = "#B22222"];
    channel1 -> client1 [label = "message", color = "#B22222"];
}

在后面的内容中, 咱们将探讨 SUBSCRIBE 和 PUBLISH 命令的实现, 以及这套订阅与发布机制的运做原理。服务器

订阅频道

每一个 Redis 服务器进程都维持着一个表示服务器状态的 redis.h/redisServer 结构, 结构的 pubsub_channels 属性是一个字典, 这个字典就用于保存订阅频道的信息:app

struct redisServer { // ... dict *pubsub_channels; // ... }; 

其中,字典的键为正在被订阅的频道, 而字典的值则是一个链表, 链表中保存了全部订阅这个频道的客户端。spa

好比说,在下图展现的这个 pubsub_channels 示例中, client2 、 client5 和 client1 就订阅了 channel1 , 而其余频道也分别被别的客户端所订阅:code

digraph pubsub {

    rankdir = LR;

    node [shape = record, style = filled];

    edge [style = bold];

    // keys

    pubsub [label = "pubsub_channels |<channel1> channel1 |<channel2> channel2 |<channel3> channel3 | ... |<channelN> channelN", fillcolor = "#A8E270"];

    // clients blocking for channel1
    client1 [label = "client1", fillcolor = "#95BBE3"];
    client5 [label = "client5", fillcolor = "#95BBE3"];
    client2 [label = "client2", fillcolor = "#95BBE3"];
    null_1 [label = "NULL", shape = plaintext];
    
    pubsub:channel1 -> client2;
    client2 -> client5;
    client5 -> client1;
    client1 -> null_1;

    // clients blocking for channel2
    client7 [label = "client7", fillcolor = "#95BBE3"];
    null_2 [label = "NULL", shape = plaintext];

    pubsub:channel2 -> client7;
    client7 -> null_2;

    // channel

    client3 [label = "client3", fillcolor = "#95BBE3"];
    client4 [label = "client4", fillcolor = "#95BBE3"];
    client6 [label = "client6", fillcolor = "#95BBE3"];
    null_3 [label = "NULL", shape = plaintext];

    pubsub:channel3 -> client3;
    client3 -> client4;
    client4 -> client6;
    client6 -> null_3;
}

当客户端调用 SUBSCRIBE 命令时, 程序就将客户端和要订阅的频道在 pubsub_channels 字典中关联起来。server

举个例子,若是客户端 client10086 执行命令 SUBSCRIBE channel1 channel2 channel3 ,那么前面展现的 pubsub_channels 将变成下面这个样子:htm

digraph new_subscribe {

    rankdir = LR;

    node [shape = record, style = filled];

    edge [style = bold];

    // keys

    pubsub [label = "pubsub_channels |<channel1> channel1 |<channel2> channel2 |<channel3> channel3 | ... |<channelN> channelN", fillcolor = "#A8E270"];

    // clients blocking for channel1
    client1 [label = "client1", fillcolor = "#95BBE3"];
    client5 [label = "client5", fillcolor = "#95BBE3"];
    client2 [label = "client2", fillcolor = "#95BBE3"];
    client10086 [label = "client10086", fillcolor = "#FFC1C1"];
    client10086_1 [label = "client10086", fillcolor = "#FFC1C1"];
    client10086_2 [label = "client10086", fillcolor = "#FFC1C1"];
    null_1 [label = "NULL", shape = plaintext];
    null_2 [label = "NULL", shape = plaintext];
    null_3 [label = "NULL", shape = plaintext];
    
    pubsub:channel1 -> client2;
    client2 -> client5;
    client5 -> client1;
    client1 -> client10086;
    client10086 -> null_1;

    // clients blocking for channel2
    client7 [label = "client7", fillcolor = "#95BBE3"];

    pubsub:channel2 -> client7;
    client7 -> client10086_1;
    client10086_1 -> null_2;

    // channel

    client3 [label = "client3", fillcolor = "#95BBE3"];
    client4 [label = "client4", fillcolor = "#95BBE3"];
    client6 [label = "client6", fillcolor = "#95BBE3"];

    pubsub:channel3 -> client3;
    client3 -> client4;
    client4 -> client6;
    client6 -> client10086_2;
    client10086_2 -> null_3;
}

SUBSCRIBE 命令的行为能够用伪代码表示以下:

def SUBSCRIBE(client, channels): # 遍历全部输入频道 for channel in channels: # 将客户端添加到链表的末尾 redisServer.pubsub_channels[channel].append(client) 

经过 pubsub_channels 字典, 程序只要检查某个频道是否为字典的键, 就能够知道该频道是否正在被客户端订阅; 只要取出某个键的值, 就能够获得全部订阅该频道的客户端的信息。

发送信息到频道

了解了 pubsub_channels 字典的结构以后, 解释 PUBLISH 命令的实现就很是简单了: 当调用 PUBLISH channel message 命令, 程序首先根据 channel 定位到字典的键, 而后将信息发送给字典值链表中的全部客户端。

好比说,对于如下这个 pubsub_channels 实例, 若是某个客户端执行命令 PUBLISH channel1 "hello moto" ,那么 client2 、 client5 和 client1 三个客户端都将接收到 "hello moto" 信息:

digraph pubsub {

    rankdir = LR;

    node [shape = record, style = filled];

    edge [style = bold];

    // keys

    pubsub [label = "pubsub_channels |<channel1> channel1 |<channel2> channel2 |<channel3> channel3 | ... |<channelN> channelN", fillcolor = "#A8E270"];

    // clients blocking for channel1
    client1 [label = "client1", fillcolor = "#95BBE3"];
    client5 [label = "client5", fillcolor = "#95BBE3"];
    client2 [label = "client2", fillcolor = "#95BBE3"];
    null_1 [label = "NULL", shape = plaintext];
    
    pubsub:channel1 -> client2;
    client2 -> client5;
    client5 -> client1;
    client1 -> null_1;

    // clients blocking for channel2
    client7 [label = "client7", fillcolor = "#95BBE3"];
    null_2 [label = "NULL", shape = plaintext];

    pubsub:channel2 -> client7;
    client7 -> null_2;

    // channel

    client3 [label = "client3", fillcolor = "#95BBE3"];
    client4 [label = "client4", fillcolor = "#95BBE3"];
    client6 [label = "client6", fillcolor = "#95BBE3"];
    null_3 [label = "NULL", shape = plaintext];

    pubsub:channel3 -> client3;
    client3 -> client4;
    client4 -> client6;
    client6 -> null_3;
}

PUBLISH 命令的实现能够用如下伪代码来描述:

def PUBLISH(channel, message): # 遍历全部订阅频道 channel 的客户端 for client in server.pubsub_channels[channel]: # 将信息发送给它们 send_message(client, message) 

退订频道

使用 UNSUBSCRIBE 命令能够退订指定的频道, 这个命令执行的是订阅的反操做: 它从 pubsub_channels 字典的给定频道(键)中, 删除关于当前客户端的信息, 这样被退订频道的信息就不会再发送给这个客户端。

模式的订阅与信息发送

当使用 PUBLISH 命令发送信息到某个频道时, 不只全部订阅该频道的客户端会收到信息, 若是有某个/某些模式和这个频道匹配的话, 那么全部订阅这个/这些频道的客户端也一样会收到信息。

下图展现了一个带有频道和模式的例子, 其中 tweet.shop.* 模式匹配了 tweet.shop.kindle 频道和 tweet.shop.ipad 频道, 而且有不一样的客户端分别订阅它们三个:

digraph pattern_relation {
    
    rankdir = BT;

    node [style = filled];
    edge [style = bold];

    kindle [label = "tweet.shop.kindle", fillcolor = "#A8E270"];

    ipad [label = "tweet.shop.ipad", fillcolor = "#A8E270"];

    node [shape = octagon];
    pattern [label = "tweet.shop.*"];

    pattern -> kindle [label = "match"];
    pattern -> ipad [label = "match"];

    node [shape = box];

    client123 [fillcolor = "#95BBE3"];
    client256 [fillcolor = "#95BBE3"];

    clientX [fillcolor = "#95BBE3"];
    clientY [fillcolor = "#95BBE3"];

    client3333 [fillcolor = "#95BBE3"];
    client4444 [fillcolor = "#95BBE3"];
    client5555 [fillcolor = "#95BBE3"];

    client123 -> pattern [label = "subscribe"];
    client256 -> pattern [label = "subscribe"];

    clientX -> kindle [label = "subscribe"];
    clientY -> kindle [label = "subscribe"];

    client3333 -> ipad [label = "subscribe"];
    client4444 -> ipad [label = "subscribe"];
    client5555 -> ipad [label = "subscribe"];
}

当有信息发送到 tweet.shop.kindle 频道时, 信息除了发送给 clientX 和 clientY 以外, 还会发送给订阅 tweet.shop.* 模式的 client123 和 client256 :

digraph send_message_to_pattern {
  
    node [style = filled];
    edge [style = bold];

    // tweet.shop.ipad

    ipad [label = "tweet.shop.ipad", fillcolor = "#A8E270"];
    ipad -> pattern [label = "match", dir = back];

    node [shape = box];
    ipad -> client3333 [label = "subscribe", dir = back];
    ipad -> client4444 [label = "subscribe", dir = back];
    ipad -> client5555 [label = "subscribe", dir = back];

    node [shape = plaintext];
    message [label = "PUBLISH tweet.shop.kindle message", fillcolor = "#FADCAD"];

    kindle [label = "tweet.shop.kindle", shape = ellipse, fillcolor = "#A8E270"];
    pattern [label = "tweet.shop.*", shape = octagon];

    message -> kindle [style = "bold, dashed", color = "#B22222"];
    kindle -> pattern [style = "bold, dashed", color = "#B22222"];

    node [shape = box];
    kindle -> clientX [style = "bold, dashed", color = "#B22222", label = "message"];
    kindle -> clientY [style = "bold, dashed", color = "#B22222", label = "message"];

    pattern -> client123 [label = "message", style = "bold, dashed", color = "#B22222"];
    pattern -> client256 [label = "message", style = "bold, dashed", color = "#B22222"];

    // client color

    client123 [fillcolor = "#95BBE3"];
    client256 [fillcolor = "#95BBE3"];

    clientX [fillcolor = "#95BBE3"];
    clientY [fillcolor = "#95BBE3"];

    client3333 [fillcolor = "#95BBE3"];
    client4444 [fillcolor = "#95BBE3"];
    client5555 [fillcolor = "#95BBE3"];


}

另外一方面, 若是接收到信息的是频道 tweet.shop.ipad , 那么 client123 和 client256 一样会收到信息:

digraph pattern_relation {
    
    rankdir = BT;

    node [style = filled];
    edge [style = bold];

    kindle [label = "tweet.shop.kindle", fillcolor = "#A8E270"];

    ipad [label = "tweet.shop.ipad", fillcolor = "#A8E270"];

    node [shape = octagon];
    pattern [label = "tweet.shop.*"];

    pattern -> kindle [label = "match"];
    pattern -> ipad [style = "bold, dashed", color = "#B22222", dir = back];

    node [shape = box];

    client123 -> pattern [label = "message", dir = back, style= "bold, dashed", color = "#B22222"];
    client256 -> pattern [label = "message", dir = back, style= "bold, dashed", color = "#B22222"];

    clientX -> kindle [label = "subscribe"];
    clientY -> kindle [label = "subscribe"];

    client3333 -> ipad [label = "message", style = "bold, dashed", color = "#B22222", dir = back];
    client4444 -> ipad [label = "message", style = "bold, dashed", color = "#B22222", dir = back];
    client5555 -> ipad [label = "message", style = "bold, dashed", color = "#B22222", dir = back];

    // new

    publish [label = "PUBLISH tweet.shop.ipad message", shape = plaintext, fillcolor = "#FADCAD"];

    ipad -> publish [style = "bold, dashed", color = "#B22222", dir = back];

    // client color

    client123 [fillcolor = "#95BBE3"];
    client256 [fillcolor = "#95BBE3"];

    clientX [fillcolor = "#95BBE3"];
    clientY [fillcolor = "#95BBE3"];

    client3333 [fillcolor = "#95BBE3"];
    client4444 [fillcolor = "#95BBE3"];
    client5555 [fillcolor = "#95BBE3"];



}

订阅模式

redisServer.pubsub_patterns 属性是一个链表,链表中保存着全部和模式相关的信息:

struct redisServer { // ... list *pubsub_patterns; // ... }; 

链表中的每一个节点都包含一个 redis.h/pubsubPattern 结构:

typedef struct pubsubPattern { redisClient *client; robj *pattern; } pubsubPattern; 

client 属性保存着订阅模式的客户端,而 pattern 属性则保存着被订阅的模式。

每当调用 PSUBSCRIBE 命令订阅一个模式时, 程序就建立一个包含客户端信息和被订阅模式的 pubsubPattern 结构, 并将该结构添加到 redisServer.pubsub_patterns 链表中。

做为例子,下图展现了一个包含两个模式的 pubsub_patterns 链表, 其中 client123 和 client256 都正在订阅 tweet.shop.* 模式:

digraph publish_pattern {
    
    rankdir = LR;

    node [shape = record, style = filled];

    edge [style = bold];

    redisServer [label = "redisServer| ... |<pubsub_patterns> pubsub_patterns | ...", fillcolor = "#A8E270"];

    pubsubPattern_1 [label = "pubsubPattern | client \n client123 | pattern \n tweet.shop.*", fillcolor = "#95BBE3"];

    pubsubPattern_2 [label = "pubsubPattern | client \n client256 | pattern \n tweet.shop.*", fillcolor = "#95BBE3"];

    redisServer:pubsub_patterns -> pubsubPattern_1;
    pubsubPattern_1 -> pubsubPattern_2;
}

若是这时客户端 client10086 执行 PSUBSCRIBE broadcast.list.* , 那么 pubsub_patterns 链表将被更新成这样:

digraph pubsub_pattern {
    
    rankdir = LR;

    node [shape = record, style = filled];

    edge [style = bold];

    redisServer [label = "redisServer| ... |<pubsub_patterns> pubsub_patterns | ...", fillcolor = "#A8E270"];

    pubsubPattern_1 [label = "pubsubPattern | client \n client123 | pattern \n tweet.shop.*", fillcolor = "#95BBE3"];

    pubsubPattern_2 [label = "pubsubPattern | client \n client256 | pattern \n tweet.shop.*", fillcolor = "#95BBE3"];

    pubsubPattern_3 [label = "pubsubPattern | client \n client10086 | pattern \n broadcast.live.*", fillcolor = "#FFC1C1"];

    redisServer:pubsub_patterns -> pubsubPattern_1;
    pubsubPattern_1 -> pubsubPattern_2;
    pubsubPattern_2 -> pubsubPattern_3;
}

经过遍历整个 pubsub_patterns 链表,程序能够检查全部正在被订阅的模式,以及订阅这些模式的客户端。

发送信息到模式

发送信息到模式的工做也是由 PUBLISH 命令进行的, 在前面讲解频道的时候, 咱们给出了这样一段伪代码, 说它定义了 PUBLISH 命令的行为:

def PUBLISH(channel, message): # 遍历全部订阅频道 channel 的客户端 for client in server.pubsub_channels[channel]: # 将信息发送给它们 send_message(client, message) 

可是,这段伪代码并无完整描述 PUBLISH 命令的行为, 由于 PUBLISH 除了将 message 发送到全部订阅 channel 的客户端以外, 它还会将 channel 和 pubsub_patterns 中的模式进行对比, 若是 channel 和某个模式匹配的话, 那么也将 message 发送到订阅那个模式的客户端。

完整描述 PUBLISH 功能的伪代码定于以下:

def PUBLISH(channel, message): # 遍历全部订阅频道 channel 的客户端 for client in server.pubsub_channels[channel]: # 将信息发送给它们 send_message(client, message) # 取出全部模式,以及订阅模式的客户端 for pattern, client in server.pubsub_patterns: # 若是 channel 和模式匹配 if match(channel, pattern): # 那么也将信息发给订阅这个模式的客户端 send_message(client, message) 

举个例子,若是 Redis 服务器的 pubsub_patterns 状态以下:

digraph pubsub_pattern {
    
    rankdir = LR;

    node [shape = record, style = filled];

    edge [style = bold];

    redisServer [label = "redisServer| ... |<pubsub_patterns> pubsub_patterns | ...", fillcolor = "#A8E270"];

    pubsubPattern_1 [label = "pubsubPattern | client \n client123 | pattern \n tweet.shop.*", fillcolor = "#95BBE3"];

    pubsubPattern_2 [label = "pubsubPattern | client \n client256 | pattern \n tweet.shop.*", fillcolor = "#95BBE3"];

    pubsubPattern_3 [label = "pubsubPattern | client \n client10086 | pattern \n broadcast.live.*", fillcolor = "#FFC1C1"];

    redisServer:pubsub_patterns -> pubsubPattern_1;
    pubsubPattern_1 -> pubsubPattern_2;
    pubsubPattern_2 -> pubsubPattern_3;
}

那么当某个客户端发送信息 "Amazon Kindle, $69." 到 tweet.shop.kindle 频道时, 除了全部订阅了 tweet.shop.kindle 频道的客户端会收到信息以外, 客户端 client123 和 client256 也一样会收到信息, 由于这两个客户端订阅的 tweet.shop.* 模式和 tweet.shop.kindle 频道匹配。

退订模式

使用 PUNSUBSCRIBE 命令能够退订指定的模式, 这个命令执行的是订阅模式的反操做: 程序会删除 redisServer.pubsub_patterns 链表中, 全部和被退订模式相关联的 pubsubPattern 结构, 这样客户端就不会再收到和模式相匹配的频道发来的信息。

小结

  • 订阅信息由服务器进程维持的 redisServer.pubsub_channels 字典保存,字典的键为被订阅的频道,字典的值为订阅频道的全部客户端。
  • 当有新消息发送到频道时,程序遍历频道(键)所对应的(值)全部客户端,而后将消息发送到全部订阅频道的客户端上。
  • 订阅模式的信息由服务器进程维持的 redisServer.pubsub_patterns 链表保存,链表的每一个节点都保存着一个 pubsubPattern 结构,结构中保存着被订阅的模式,以及订阅该模式的客户端。程序经过遍历链表来查找某个频道是否和某个模式匹配。
  • 当有新消息发送到频道时,除了订阅频道的客户端会收到消息以外,全部订阅了匹配频道的模式的客户端,也一样会收到消息。
  • 退订频道和退订模式分别是订阅频道和订阅模式的反操做。
相关文章
相关标签/搜索