Redis发布与订阅——PUBLISH & SUBSCRIBEjava
通常来讲,发布与订阅(又称pub/sub)的特色是订阅者(listener)负责订阅频道(channel),发送者(publisher)负责向频道发送二进制字符串消息(binary string message)。每当有消息发送至给定频道时,频道的订阅者都会收到消息。咱们也能够把频道看做是电台,其中订阅者能够同时收听多个电台,而发送者则能够在任何电台发送消息。redis
Redis 的 SUBSCRIBE 命令能够让客户端订阅任意数量的频道, 每当有新信息发送到被订阅的频道时, 信息就会被发送给全部订阅指定频道的客户端。shell
做为例子, 下图展现了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系:ide
当有新消息经过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:测试
订阅和发布ui
首先打开一个cli,订阅一个频道,以下,this
127.0.0.1:7000> subscribe mychannel Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "mychannel" 3) (integer) 1
而后向该频道发送一个消息,以下,spa
127.0.0.1:7000> publish mychannel 'hell world' (integer) 1 127.0.0.1:7000>
刚才的客户端已经订阅了该频道,因此该频道就会收到消息,以下,code
➜ ~ redis-cli -p 7000 127.0.0.1:7000> subscribe mychannel Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "mychannel" 3) (integer) 1 1) "message" 2) "mychannel" 3) "hell world"
其余命令:字符串
UNSUBSCRIBE——退订给定的一个或多个频道,若是执行是没有给定任何频道,那么退订全部频道
PSUBSCRIBE——PSUBSCRIBE pattern [pattern ...] 订阅与给定模式相匹配的全部频道
PUNSUBSCRIBE——PUNSUBSCRIBE [pattern [pattern ...]] 退订给定的模式,若是执行是没有给定任何模式,那么退订全部模式。
首先实现一个Listener,用于订阅消息,接收消息,
package com.usoft.jedis.sample; import redis.clients.jedis.JedisPubSub; /** * Created by xinxingegeya on 16/4/13. */ public class MessageListener extends JedisPubSub { @Override public void onMessage(String channel, String message) { System.out.println("channel:" + channel + ",message:" + message); //此处咱们能够取消订阅 if (message.equalsIgnoreCase("quit")) { this.unsubscribe(channel); } } }
写一个测试类,实现订阅和发布,
package com.usoft.jedis.sample; import org.junit.After; import org.junit.Before; import org.junit.Test; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; /** * Created by xinxingegeya on 16/4/13. */ public class PubSubTest { /** * jedis链接池 */ public JedisPool jedisPool; /** * 发布和订阅的频道 */ public String channel = "mychannel"; @Before public void before() { JedisPoolConfig config = new JedisPoolConfig(); config.setMaxTotal(10); config.setMaxIdle(5); config.setMaxWaitMillis(5000); config.setTestOnBorrow(true); jedisPool = new JedisPool(config, "127.0.0.1", 7000); } @After public void after() { jedisPool.close(); } @Test public void subscribe() { Jedis jedis = jedisPool.getResource(); jedis.subscribe(new MessageListener(), channel); } /** * 发布9次消息后,在此发送quit消息,使listener(订阅者)关闭 */ @Test public void publish() { Jedis jedis = jedisPool.getResource(); for (int i = 0; i < 10; i++) { if (i == 9) { jedis.publish(channel, "quit"); } else { jedis.publish(channel, "hello world"); } } } }
=========END=========