本文出处:http://blog.csdn.net/chaijunkun/article/details/27361453,转载请注明。因为本人不按期会整理相关博文,会对相应内容做出完善。所以强烈建议在原始出处查看此文。 html
最近在作一套系统,其中要求若干个Worker服务器将心跳信息都上报给中央服务器。当必定时间中央服务器没有获得心跳信息时则认为该Worker失效了,发出告警。 java
知足这种需求的解决方法多种多样,我开始想到了memcache,上报一次心跳信息就刷新一次缓存,当缓存心里跳信息对象超时被删除,即认为对应的Worker失效。然而因为memcache的工做原理,删除都是被动的,咱们没法及时判断数据是否过时,即使知道了数据过时,也没有一种机制来回调方法来执行自定义的处理动做。难道缓存架构就真的不行了吗? redis
答案是否认的。在Redis 2.8.0版本起,加入了“Keyspace notifications”(即“键空间通知”)的功能。如何理解该功能呢?咱们来看下官方是怎么说的: 数据库
键空间通知,容许Redis客户端从“发布/订阅”通道中创建订阅关系,以便客户端可以在Redis中的数据因某种方式受到影响时收到相应事件。
缓存
可能接收到的事件举例以下:
服务器
影响一个给出的键的全部命令(会告诉你哪一个键被执行了一个命令,这个命令是什么);
架构
接收到了一个LPUSH操做的全部键(LPUSH命令:key v1 [v2 v3..]将指定的全部值从左到右进行压栈操做,造成一个栈,并将该栈命名为指定的key);
app
在数据库0中失效的全部键(不必定非得是数据库0,这里这样表述其实想表达能够知道影响的哪一个数据库)。 ide
看到这里我联想到,若是一条缓存数据失效了,经过订阅关系,客户端会收到消息,经过分析消息能够得知何种消息,分析消息内容能够知道是哪一个key失效了。这样就能够间接实现开头所描述的功能。 测试
接下来咱们来看下实验的步骤:
1.准备redis服务器
做为开源软件,redis下载后获得的是源代码,使用tar -xzvf redis-2.8.9.tar.gz解压后对其进行编译,过程也很简单,make就能够了。编译完成以后可使用自带的runtest进行测试,看是否编译成功。而后就是安装了,执行make PREFIX=/usr/local/redis-2.8.9 install,PREFIX参数指定的就是安装路径。如今安装的只有可执行文件,尚未配置文件。其实在源码目录中有一个模板redis.conf,咱们对它进行改动就能够了。
其余配置咱们不关心,可是官方文档中说Keyspace notifications功能默认是关闭的(默认地,Keyspace 时间通知功能是禁用的,由于它或多或少会使用一些CPU的资源),咱们须要打开它。打开的方法也很简单,配置属性:notify-keyspace-events
默认配置是这样的:notify-keyspace-events ""
根据文档中的说明:
- K Keyspace events, published with __keyspace@<db>__ prefix.
- E Keyevent events, published with __keyevent@<db>__ prefix.
- g Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ...
- $ String commands
- l List commands
- s Set commands
- h Hash commands
- z Sorted set commands
- x Expired events (events generated every time a key expires)
- e Evicted events (events generated when a key is evicted for maxmemory)
- A Alias for g$lshzxe, so that the "AKE" string means all the events.
咱们配置为:
notify-keyspace-events Ex,含义为:发布key事件,使用过时事件(当每个key失效时,都会生成该事件)。
2.准备客户端和链接配置
本文中使用的客户端是Jedis,版本为2.4.2,为了代码的通用性,我使用Spring来管理链接:
- <bean id="pool" class="redis.clients.jedis.JedisPool">
- <constructor-arg>
- <bean id="config" class="redis.clients.jedis.JedisPoolConfig">
- <property name="maxIdle" value="0" />
- <property name="maxTotal" value="20" />
- <property name="maxWaitMillis" value="1000" />
- <property name="testOnBorrow" value="true" />
- </bean>
- </constructor-arg>
- <constructor-arg>
- <value>192.168.1.100</value>
- </constructor-arg>
- <constructor-arg>
- <value>6379</value>
- </constructor-arg>
- </bean>
而后使用Spring Test和Junit来测试代码
- @RunWith(SpringJUnit4ClassRunner.class)
- @ContextConfiguration("/applicationContext*.xml")
- public class RedisSubscribeDemo {
-
- private static final Log Log= LogFactory.getLog(RedisSubscribeDemo.class);
-
- @Resource
- private JedisPool pool;
-
- @Test
- public void doTest() throws InterruptedException {
- for (int i = 0; i < 1; i++) {
- TestThread thread= new TestThread(pool);
- thread.start();
- }
- Thread.sleep(50000L);
- Log.info("Test finish...");
- }
- }
因为要使用必定的延迟,咱们把主要测试代码放到了TestThread中。当测试线程启动后,主线程停滞50秒,让咱们有足够的时间来操做。
- public class TestThread extends Thread {
-
- private Log log= LogFactory.getLog(TestThread.class);
-
- private JedisPool pool;
-
- public TestThread(JedisPool pool){
- log.info("loading test thread");
- this.pool= pool;
- }
-
- @Override
- public void run() {
- Jedis jedis= pool.getResource();
- jedis.psubscribe(new MySubscribe(), "*");
- try {
- Thread.sleep(10000L);
- } catch (InterruptedException e) {
- log.info("延时失败", e);
- }
- jedis.close();
- log.info("Test run finished");
- }
- }
在测试线程中,咱们将自定义的MySubscribe加入到了Jedis的模板订阅(即psubscribe,由于模板订阅的channel是支持星号'*'通配的,这样能够收集到多个通配通道的消息,而与之相反的还有一个subscribe,此订阅只能指定严格匹配的通道)中,一样为了测试过程可以将结果显示出来,在绑定了订阅后,对该线程进行了延时10秒。
- public class MySubscribe extends JedisPubSub {
-
- private static final Log log= LogFactory.getLog(MySubscribe.class);
-
- // 初始化按表达式的方式订阅时候的处理
- public void onPSubscribe(String pattern, int subscribedChannels) {
- log.info(pattern + "=" + subscribedChannels);
- }
-
- // 取得按表达式的方式订阅的消息后的处理
- public void onPMessage(String pattern, String channel, String message) {
- log.info(pattern + "=" + channel + "=" + message);
- }
-
- ...其余未用到的重写方法忽略
- }
做为Jedis自定义订阅,必须继承redis.clients.jedis.JedisPubSub类,在
psubscribe模式下,重点重写onPMessage方法,该方法为接收到模板订阅后处理事件的重要代码。pattern为在绑定订阅时使用的通配模板,channel为通配后符合条件的实际通道名称,message就不用多说了,就是事件消息内容。
3.实战
经过Redis自带的redis-cli命令,咱们能够在服务端经过命令行的方式直接操做。咱们运行上面的示例代码,而后迅速切换到redis-cli命令中,创建一个生命周期很短暂的数据:
- 127.0.0.1:6379> set chaijunkun 123 PX 100
PX参数指定生命周期单位为毫秒,100即声明周期,即100毫秒。key为chaijunkun的数据,其值为123。
当执行语句后,回显:
- *=__keyevent@0__:expired=chaijunkun
从输出能够看出,以前指定的通配符为*,通配任何通道;以后是实际的通道名称:__keyevent@0__:expired,这里咱们能够看到订阅收到了一个keyevent位于数据库0,事件类型为:expired,是一个过时事件;最后是chaijunkun,这个是过时数据的key。
在官方文档中,keyevent通道的格式永远是这样的:
__keyevent@<db>__:prefix
对于数据过时事件,咱们在绑定订阅时通配模板也能够精确地写成:
__keyevent@*__:expired
经过示例代码,咱们能够看到确实印证了以前的构想,实现了数据过时的事件触发(event)或者说回调(callback)。
4.其余应用
以前的代码中,对于事件的发布都是由Redis本身生成的。实际上在命令中主动发布自定义消息也是能够的,在publish命令的帮助中咱们看到:
- 127.0.0.1:6379> help publish
-
- PUBLISH channel message
- summary: Post a message to a channel
- since: 2.0.0
- group: pubsub
经过参数,能够自定义通道名称和通道消息。而在Jedis中,发布API甚至作到了字节数据的级别:
jedis.publish(byte[] channel, byte[] message)
所以咱们能够构想,自定一套通信协议,channel为命令字,message为消息体,咱们能够经过redis这种简单的发布/订阅机制实现消息的分发。