我在生产项目里是如何使用Redis发布订阅的?(二)Java版代码实现(含源码)

上篇文章讲了在实际项目里的哪些业务场景用到Redis发布订阅,这篇文章就讲一下,在Java中如何实现的。html

 

图解代码结构

发布订阅的理论以及使用场景你们都已经有了大体了解了,可是怎么用代码实现发布订阅呢?在这里给你们分享一下实现方式。java

 

咱们以上篇文章的第三种使用场景为例,先来看一下总体实现类图吧。web

 

解释一下,这里咱们首先定义一个统一接口`ICacheUpdate`,只有一个`update`方法,咱们令`Service`层实现这个方法,执行具体的更新操做。redis

 

咱们再来看`RedisMsgPubSub`,它继承`redis.clients.jedis.JedisPubSub`,主要重写其`onMessage()`方法(订阅的频道有消息到来时会触发这个方法),咱们在这个方法里调用`RedisMsgPubSub`的`update`方法执行更新操做。缓存

 

当咱们有多个`Service`实现`ICacheUpdate`时,咱们就很是迫切地须要一个管理器来集中管理这些`Service`,而且当触发onMessage方法时要告诉onMessage方法具体调用哪一个`ICacheUpdate`的实现类,因此咱们有了`PubSubManager`。而且咱们单独开启一个线程来维护发布订阅,因此管理器继承了`Thread`类。app

 

代码实现

具体代码:ide

统一接口this

public interface ICacheUpdate {
    public void update(); }

 

Service层spa

实现ICacheUpdate的update方法,执行具体的更新操做线程

public class InfoService implements ICacheUpdate {
  private static Logger logger = LoggerFactory.getLogger(InfoService.class); @Autowired private RedisCache redisCache; @Autowired private InfoMapper infoMapper; /** * 按信息类型分类查询信息 * @return */ public Map<String, List<Map<String, Object>>> selectAllInfo(){ Map<String, List<Map<String, Object>>> resultMap = new HashMap<String, List<Map<String, Object>>>(); List<String> infoTypeList = infoMapper.selectInfoType();//信息表中全部涉及的信息类型 logger.info("-------按信息类型查找公共信息开始----"+infoTypeList); if(infoTypeList!=null && infoTypeList.size()>0) { for (String infoType : infoTypeList) { List<Map<String, Object>> result = infoMapper.selectByInfoType(infoType); resultMap.put(infoType, result); } } return resultMap; } @Override public void update() { //缓存首页信息 logger.info("InfoService selectAllInfo 刷新缓存"); Map<String, List<Map<String, Object>>> resultMap = this.selectAllInfo(); Set<String> keySet = resultMap.keySet(); for(String key:keySet){ List<Map<String, Object>> value = resultMap.get(key); redisCache.putObject(GlobalSt.PUBLIC_INFO_ALL+key, value); } } }

Redis发布订阅的扩展类

 做用:

 一、统一管理ICacheUpdate,把全部实现ICacheUpdate接口的类添加到updates容器

 二、重写onMessage方法,订阅到消息后进行刷新缓存的操做

public class RedisMsgPubSub extends JedisPubSub {
    private static Logger logger = LoggerFactory.getLogger(RedisMsgPubSub.class); private Map<String , ICacheUpdate> updates = new HashMap<String , ICacheUpdate>(); //一、由updates统一管理ICacheUpdate public boolean addListener(String key , ICacheUpdate update) { if(update == null) return false;     updates.put(key, update);     return true; } /** * 二、重写onMessage方法,订阅到消息后进行刷新缓存的操做 * 订阅频道收到的消息 */ @Override public void onMessage(String channel, String message) { logger.info("RedisMsgPubSub onMessage channel:{},message :{}" ,channel, message); ICacheUpdate updater = null; if(StringUtil.isNotEmpty(message)) updater = updates.get(message); if(updater!=null) updater.update(); } //other code... }

发布订阅的管理器

执行的操做:

一、将全部须要刷新加载的Service类(实现ICacheUpdate接口)添加到RedisMsgPubSub的updates中

二、启动线程订阅pubsub_config频道,收到消息后的五秒后再次订阅(避免订阅到一次消息后结束订阅)

public class PubSubManager extends Thread{
    private static Logger logger = LoggerFactory.getLogger(PubSubManager.class); public static Jedis jedis; RedisMsgPubSub msgPubSub = new RedisMsgPubSub(); //频道 public static final String PUNSUB_CONFIG = "pubsub_config"; //1.将全部须要刷新加载的Service类(实现ICacheUpdate接口)添加到RedisMsgPubSub的updates中 public boolean addListener(String key, ICacheUpdate listener){ return msgPubSub.addListener(key,listener); } @Override public void run(){ while (true){ try { JedisPool jedisPool = SpringTools.getBean("jedisPool", JedisPool.class); if(jedisPool!=null){ jedis = jedisPool.getResource(); if(jedis!=null){ //2.启动线程订阅pubsub_config频道 阻塞  jedis.subscribe(msgPubSub,PUNSUB_CONFIG); } } } catch (Exception e) { logger.error("redis connect error!"); } finally { if(jedis!=null) jedis.close(); } try { //3.收到消息后的五秒后再次订阅(避免订阅到一次消息后结束订阅) Thread.sleep(5000); } catch (InterruptedException e) { logger.error("InterruptedException in redis sleep!"); } } } }

到此,Redis的发布订阅大体已经实现。咱们何时启用呢?咱们能够选择在启动项目时完成订阅和基础数据的加载,因此咱们经过实现`javax.servlet.SevletContextListener`来完成这一操做。而后将监听器添加到`web.xml`。

CacheInitListener.java

/**
 * 加载系统参数
 */
public class CacheInitListener implements ServletContextListener{
    private static Logger logger = LoggerFactory.getLogger(CacheInitListener.class); @Override public void contextDestroyed(ServletContextEvent arg0) { } @Override public void contextInitialized(ServletContextEvent arg0) { logger.info("---CacheListener初始化开始---"); init(); logger.info("---CacheListener初始化结束---"); } public void init() { try { //得到管理器 PubSubManager pubSubManager = SpringTools.getBean("pubSubManager", PubSubManager.class); InfoService infoService = SpringTools.getBean("infoService", InfoService.class); //添加到管理器 pubSubManager.addListener("infoService", infoService); //other service... //启动线程执行订阅操做  pubSubManager.start(); //初始化加载  loadParamToRedis(); } catch (Exception e) { logger.info(e.getMessage(), e); } } private void loadParamToRedis() { InfoService infoService = SpringTools.getBean("infoService", InfoService.class); infoService.update(); //other service...  } }

web.xml

<listener>
  <listener-class>com.xxx.listener.CacheInitListener</listener-class>
</listener>

【end】

上篇文章讲了在实际项目里的哪些业务场景用到Redis发布订阅,这篇文章就讲一下,在Java中如何实现的。

相关文章
相关标签/搜索