Redis的Pub/Sub模式

Redis一样支持消息的发布/订阅(Pub/Sub)模式,这和中间件activemq有些相似。订阅者(Subscriber)能够订阅本身感兴趣的频道(Channel),发布者(Publisher)能够将消息发往指定的频道(Channel),正式经过这种方式,能够将消息的发送者和接收者解耦。另外,因为能够动态的Subscribe和Unsubscribe,也能够提升系统的灵活性和可扩展性。java

关于如何搭建Redis环境,请参考其余文章。这里假设有一个可用的Redis环境(单节点和集群都可)。redis

在redis-cli中使用Pub/Sub

普通channel的Pub/Sub

先用一个客户端来订阅频道:shell

上图中先使用redis-cli做为客户端链接了Redis,以后使用了SUBSCRIBE命令,后面的参数表示订阅了china和hongkong两个channel。能够看到"SUBSCRIBE china hongkong"这条命令的输出是6行(能够分为2组,每一组是一个Message)。由于订阅、取消订阅的操做跟发布的消息都是经过消息(Message)的方式发送的,消息的第一个元素就是消息类型,它能够是如下几种类型:apache

subscribe: means that we successfully subscribed to the channel given as the second element in the reply. The third argument represents the number of channels we are currently subscribed to.app

unsubscribe: means that we successfully unsubscribed from the channel given as second element in the reply. The third argument represents the number of channels we are currently subscribed to. When the last argument is zero, we are no longer subscribed to any channel, and the client can issue any kind of Redis command as we are outside the Pub/Sub state.maven

message: it is a message received as result of a PUBLISH command issued by another client. The second element is the name of the originating channel, and the third argument is the actual message payload.ide

--from http://redis.io/topics/pubsub函数

上图的订阅命令将使得发往这两个channel的消息会被这个客户端接收到。须要注意的是,redis-cli客户端在进入subscribe模式之后,将不能再响应其余的任何命令测试

A client subscribed to one or more channels should not issue commands, although it can subscribe and unsubscribe to and from other channels.ui

The commands that are allowed in the context of a subscribed client are SUBSCRIBE, PSUBSCRIBE, UNSUBSCRIBE, PUNSUBSCRIBE, PING and QUIT

--from http://redis.io/topics/pubsub

官网说客户端在subscribe下除了可使用以上命令外,不能使用其余命令了。可是本人在Subscribe状态下使用上述几个命令,根本没反应。也就是说,使用redis-cli订阅channel后,该客户端将不能响应任何命令。除非按下(ctrl+c),但该操做不是取消订阅,而是退出redis-cli,此时将回到shell命令行下。

关于这个状况,我在官网上没有找到对这种状况的解释,也有很多的人在网上问,找来找去,本人以为还算合理的解释是:

On this page: http://redis.io/commands/subscribe applies only to those clients.

The redis-cli is among those clients. So, the comment is not an instruction for users of redis-cli.

Instead, redis-cli blocks waiting for messages on the bus (only to be unsubcribed via a ctrl+c).

--from http://stackoverflow.com/questions/17621371/redis-unsubscribe

就是说,官网中说明的client,并不包含这里使用的redis-cli,因而它能够和其余的client有不一样表现。(先不纠结这个问题,稍后再用jedis来测试一下。)

接下来再用一个客户端来发布消息:

能够看到,新的一个客户端使用PUBLISH命令往china频道发布了一条叫"China News"的消息,接下来再看看订阅端:

能够看见,这条消息已经被接收到了。能够看到,收到的消息中第一个参数是类型"message",第二个参数是channel名字"china",第三个参数是消息内容"China News",这和开始说的message类型的结构一致。

通配符的Pub/Sub

Redis还支持通配符的订阅和发布。客户端能够订阅知足一个或多个规则的channel消息,相应的命令是PSUBSCRIBE和PUNSUBSCRIBE。接下来咱们再用另外一个redis-cli客户端来订阅"chi*"的channel,如图:

和subscribe/unsubscribe的输出相似,能够看到第一部分是消息类型“psubscribe”,第二部分是订阅的规则“chi*”,第三部分则是该客户端目前订阅的全部规则个数。

接下来再发布一条消息到china这个channel中,此时,两个订阅者应该都能收到该消息:

实际测试结果跟预期相同。须要注意的是,订阅者2经过通配符订阅的,收到的消息类型是“pmessage”:

pmessage: it is a message received as result of a PUBLISH command issued by another client, matching a pattern-matching subscription. The second element is the original pattern matched, the third element is the name of the originating channel, and the last element the actual message payload.

--from http://redis.io/topics/pubsub

第二部分是匹配的模式“chi*”,第三部分是实际的channel名字“china”,第四部分是消息内容“China Daily”。

咱们再发布一条消息到chinnna中,此时只有订阅者2能接收到消息了:

一样,在使用PSUBSCRIBE进入订阅模式之后,该redis-cli也不能再监听其余任何的命令,要退出该模式,只能使用ctrl+c。

使用Jedis实现Pub/Sub

Jedis是Redis客户端的一种Java实现,在http://redis.io/clients#java中也能找到。

这里使用maven来管理包的依赖,因为使用了Log4j来输出日志,所以会用到log4j的jar包:

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.8.0</version>
</dependency>
<dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
</dependency>

Jedis中的JedisPubSub抽象类提供了订阅和取消的功能。想处理订阅和取消订阅某些channel的相关事件,咱们得扩展JedisPubSub类并实现相关的方法:

package com.demo.redis;

import org.apache.log4j.Logger;
import redis.clients.jedis.JedisPubSub;

public class Subscriber extends JedisPubSub {//注意这里继承了抽象类JedisPubSub

    private static final Logger LOGGER = Logger.getLogger(Subscriber.class);

    @Override
    public void onMessage(String channel, String message) {
    	LOGGER.info(String.format("Message. Channel: %s, Msg: %s", channel, message));
    }

    @Override
    public void onPMessage(String pattern, String channel, String message) {
    	LOGGER.info(String.format("PMessage. Pattern: %s, Channel: %s, Msg: %s", 
    	    pattern, channel, message));
    }

    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
    	LOGGER.info("onSubscribe");
    }

    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {
    	LOGGER.info("onUnsubscribe");
    }

    @Override
    public void onPUnsubscribe(String pattern, int subscribedChannels) {
    	LOGGER.info("onPUnsubscribe");
    }

    @Override
    public void onPSubscribe(String pattern, int subscribedChannels) {
    	LOGGER.info("onPSubscribe");
    }
}

有了订阅者,咱们还须要一个发布者:

package com.demo.redis;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import org.apache.log4j.Logger;
import redis.clients.jedis.Jedis;

public class Publisher {

    private static final Logger LOGGER = Logger.getLogger(Publisher.class);
    private final Jedis publisherJedis;
    private final String channel;

    public Publisher(Jedis publisherJedis, String channel) {
        this.publisherJedis = publisherJedis;
        this.channel = channel;
    }

    /**
     * 不停的读取输入,而后发布到channel上面,遇到quit则中止发布。
     */
    public void startPublish() {
    	LOGGER.info("Type your message (quit for terminate)");
        try {
            BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
            while (true) {
                String line = reader.readLine();
                if (!"quit".equals(line)) {
                    publisherJedis.publish(channel, line);
                } else {
                    break;
                }
            }
        } catch (IOException e) {
            LOGGER.error("IO failure while reading input", e);
        }
    }
}

为简单起见,这个发布者接收控制台的输入,而后将输入的消息发布到指定的channel上面,若是输入quit,则中止发布消息。

接下来是主函数:

package com.demo.redis;

import org.apache.log4j.Logger;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class Program {
    
    public static final String CHANNEL_NAME = "MyChannel";
    //我这里的Redis是一个集群,192.168.56.101和192.168.56.102均可以使用
    public static final String REDIS_HOST = "192.168.56.101";
    public static final int REDIS_PORT = 7000;
    
    private final static Logger LOGGER = Logger.getLogger(Program.class);
    private final static JedisPoolConfig POOL_CONFIG = new JedisPoolConfig();
    private final static JedisPool JEDIS_POOL = 
            new JedisPool(POOL_CONFIG, REDIS_HOST, REDIS_PORT, 0);
    
    public static void main(String[] args) throws Exception {
        final Jedis subscriberJedis = JEDIS_POOL.getResource();
        final Jedis publisherJedis = JEDIS_POOL.getResource();
        final Subscriber subscriber = new Subscriber();
        //订阅线程:接收消息
        new Thread(new Runnable() {
            public void run() {
                try {
                    LOGGER.info("Subscribing to \"MyChannel\". This thread will be blocked.");
                    //使用subscriber订阅CHANNEL_NAME上的消息,这一句以后,线程进入订阅模式,阻塞。
                    subscriberJedis.subscribe(subscriber, CHANNEL_NAME);
                    
                    //当unsubscribe()方法被调用时,才执行如下代码
                    LOGGER.info("Subscription ended.");
                } catch (Exception e) {
                    LOGGER.error("Subscribing failed.", e);
                }
            }
        }).start();
        
        //主线程:发布消息到CHANNEL_NAME频道上
        new Publisher(publisherJedis, CHANNEL_NAME).startPublish();
        publisherJedis.close();
        
        //Unsubscribe
        subscriber.unsubscribe();
        subscriberJedis.close();
    }
}

主类Program中定义了channel名字、链接redis的地址和端口,并使用JedisPool来获取Jedis实例。因为订阅者(subscriber)在进入订阅状态后会阻塞线程,所以新起一个线程(new Thread())做为订阅线程,并是用主线程来发布消息。待发布者(类中的new Publisher)中止发布消息(控制台中输入quit便可)时,解除订阅者的订阅(subscriber.unsubscribe()方法)。此时订阅线程解除阻塞,打印结束的日志并退出。

运行程序以前,还须要一个简单的log4j配置以观察输出:

log4j.rootLogger=INFO,stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss} %m%n

运行Program,如下是执行结果:

从结果看,当订阅者订阅后,订阅线程阻塞,主线程中的Publisher接收输入后,发布消息到MyChannel中,此时订阅该channel的订阅者收到消息并打印。


Jedis源码简要分析

关于使用UNSUBSCRIBE

开始使用redis-cli时,在subscriber进入监听状态后,并不能使用UNSUBSCRIBE和PUNSUBSCRIBE命令,如今在Jedis中,在订阅线程阻塞时,经过在main线程中调用改subscriber的unsubscribe()方法来解除阻塞。查看Jedis源码,其实该方法也就是给redis发送了一个UNSUBSCRIBE命令而已:

所以这里是支持在“客户端”使用UNSUBSCRIBE命令的。

关于订阅者接收消息

在接收消息前,须要订阅channel,订阅完成以后,会执行一个循环,这个循环会一直阻塞,直到该Client没有订阅数为止,以下图:

中间省略的其余行,主要是用于解析收到的Redis响应,这段代码也是根据响应的第一部分肯定响应的消息类型,而后挨个解析响应的后续内容,最后根据解析到消息类型,并使用后续解析到的内容做为参数来回调相应的方法,省略的内容以下:

final byte[] resp = (byte[]) firstObj;
if (Arrays.equals(SUBSCRIBE.raw, resp)) {
  subscribedChannels = ((Long) reply.get(2)).intValue();
  final byte[] bchannel = (byte[]) reply.get(1);
  final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
  //调用onSubscribe方法,该方法在咱们的Subscriber类中实现
  onSubscribe(strchannel, subscribedChannels);
} else if (Arrays.equals(UNSUBSCRIBE.raw, resp)) {
  subscribedChannels = ((Long) reply.get(2)).intValue();
  final byte[] bchannel = (byte[]) reply.get(1);
  final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
  //调用onUnsubscribe方法,该方法在咱们的Subscriber类中实现
  onUnsubscribe(strchannel, subscribedChannels);
} else if (Arrays.equals(MESSAGE.raw, resp)) {
  final byte[] bchannel = (byte[]) reply.get(1);
  final byte[] bmesg = (byte[]) reply.get(2);
  final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
  final String strmesg = (bmesg == null) ? null : SafeEncoder.encode(bmesg);
  //调用onMessage方法,该方法在咱们的Subscriber类中实现
  onMessage(strchannel, strmesg);
} else if (Arrays.equals(PMESSAGE.raw, resp)) {
  final byte[] bpattern = (byte[]) reply.get(1);
  final byte[] bchannel = (byte[]) reply.get(2);
  final byte[] bmesg = (byte[]) reply.get(3);
  final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
  final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
  final String strmesg = (bmesg == null) ? null : SafeEncoder.encode(bmesg);
  //调用onPMessage方法,该方法在咱们的Subscriber类中实现
  onPMessage(strpattern, strchannel, strmesg);
} else if (Arrays.equals(PSUBSCRIBE.raw, resp)) {
  subscribedChannels = ((Long) reply.get(2)).intValue();
  final byte[] bpattern = (byte[]) reply.get(1);
  final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
  onPSubscribe(strpattern, subscribedChannels);
} else if (Arrays.equals(PUNSUBSCRIBE.raw, resp)) {
  subscribedChannels = ((Long) reply.get(2)).intValue();
  final byte[] bpattern = (byte[]) reply.get(1);
  final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
  //调用onPUnsubscribe方法,该方法在咱们的Subscriber类中实现
  onPUnsubscribe(strpattern, subscribedChannels);
} else {
  //对于其余Redis没有定义的返回消息类型,则直接报错
  throw new JedisException("Unknown message type: " + firstObj);
}

以上就是为何咱们须要在Subscriber中实现这几个方法的缘由了(这些方法并非抽象的,能够选择实现使用到的方法)。


参考:

http://redis.io/topics/pubsub

http://basrikahveci.com/a-simple-jedis-publish-subscribe-example

相关文章
相关标签/搜索