咱们都知道redis集群下对于mget、mset、pipeline、事务的支持不太好。html
固然对于mget和mset有这么几种方法:node
一、串行遍历key依次执行(这种就是把批量拆开了)redis
二、使用hash_tag包装key,在计算key的slot时候,若是key包含{},就会使用第一个{}内部的字符串做为hash key,这样就能够保证拥有一样{}内部字符串的key就会拥有相同slot(这种方式其实就是把一次批量操做的key所有放到了集群的一个节点进行操做,屏蔽掉多节点的问题)。算法
三、本身手动进行批量的key作处理,经过CRC16算法对全部的key进行分组(相同slot的分红一组),而后不一样的分组keys,使用不一样的集群节点进行处理。spring
本文就是适用了第三种方式,经过 pipeline来操做批处理,减小网络请求次数,加快处理速度。segmentfault
使用jedis封装的工具类,源码也是分析的jedis。缓存
对于jedis,集群的操做使用的JedisCluster类,看下它的继承实现关系图:网络
经过继承实现管理能够看到,JedisCluster继承自 BinaryJedisCluster ,以及实现了其余接口。咱们再查看下 BinaryJedisCluster源码。mybatis
在图中咱们注意到:JedisClusterConnectionHandler 这个类,字面意思redis集群链接处理器,同时这个connectionHandler变量在此类中尚未公共的获取方法。咱们再日后看,咱们进去到JedisClusterConnectionHandler 源码看下。app
这个类中有一个内部变量JedisClusterInfoCache cache,看着字面意思是Redis集群信息缓存,可是JedisClusterConnectionHandler中没有获取cache的公共方法,往下看下JedisClusterConnectionHandler中的方法,initializeSlotsCache()
这个变量中存储了全部的redis集群节点信息(包含了host和端口),那这里面有没有集群的其余信息呢?好比slots哈希槽数据,咱们再看下JedisClusterInfoCache的源码。
看到这里相信你已经明白了,就是这个cache存储了redis集群节点数据和哈希槽对应的节点关系数据,同时这俩变量仍是私有的,往下看下源码有没有直接获取这俩变量的方法呢,答案是看下图。
只有获取节点信息(host+ip对应的链接池数据集合)的方法,没有获取哈希槽对应的节点数据的方法。可是有经过slot获取节点链接池的方法,这个也是jedis中JedisCluster集群操做类能够处理集群操做的关键。咱们经过getSlotPool(int slot)方法内部实现能够知道,slots集合中存储的关系就是slot对应redis集群节点链接池,这里就是咱们后面实现集群下操做的关键,先记一下。
反射的知识请自行百度下或者看下其余人的博客,简单说,它很强大,能够经过它获取这个类或对象中的任何东西,包含私有变量、方法。
直接看下代码实现:
String clusterNodes="172.16.16.90:16379,172.16.16.90:16380,172.16.16.91:16379,172.16.16.91:16380,172.16.16.92:16379,172.16.16.92:16380"; int redirects=3; int timeOut=2000; String[] serverArray = clusterNodes.split(","); Set<HostAndPort> nodes = new HashSet<HostAndPort>(); for (String ipPort : serverArray) { String[] ipPortPair = ipPort.split(":"); nodes.add(new HostAndPort(ipPortPair[0].trim(), Integer.valueOf(ipPortPair[1].trim()))); } JedisCluster cluster=new JedisCluster(nodes, timeOut, redirects); Field hfield = cluster.getClass().getDeclaredField("connectionHandler");//获的变量名为connectionHandler的变量 hfield.setAccessible(true);//打开访问权限 JedisClusterConnectionHandler connectionHandler = (JedisClusterConnectionHandler)hfield.get(cluster); Field cfield = connectionHandler.getClass().getDeclaredField("cache");//获的变量名为cache的变量 cfield.setAccessible(true);//打开访问权限 JedisClusterInfoCache cache = (JedisClusterInfoCache)cfield.get(connectionHandler); //获取ip+port对应的链接池 Map<String, JedisPool> nodes2 = cache.getNodes();//这个咱们没怎么用到 Field field = cache.getClass().getDeclaredField("slots");//获的变量名为slots的变量 field.setAccessible(true);//打开访问权限 //获取slot对应的链接池 Map<Integer, JedisPool> slots=(Map<Integer, JedisPool>)field.get(cache);
可能有人要问:connectionHandler不是属于BinaryJedisCluster,由于JedisCluster继承自BinaryJedisCluster,因此一样能够获取到它的内部变量。
若是你的项目框架用的spring(spring boot)+mybatis的话,那更简单,直接使用mybatis封装的反射工具操做更方便。
//前面跟上面同样的,就省略了 //经过Mybatis的反射工具实现 MetaObject metaObject = SystemMetaObject.forObject(cluster); JedisClusterInfoCache cache = (JedisClusterInfoCache) metaObject.getValue("connectionHandler.cache"); //获取ip+port对应的链接池 Map<String, JedisPool> nodes2 = cache.getNodes(); //经过反射获取JedisClusterInfoCache中的slots MetaObject meta = SystemMetaObject.forObject(cache); //获取slot对应的链接池 Map<Integer, JedisPool> slots=(Map<Integer, JedisPool>)meta.getValue("slots");
是否是省了不少的代码?
答案在spring的JedisConnectionFactory类中。
spring下配置redis集群是确定要使用JedisConnectionFactory的,那么能够经过这个链接工厂类得到JedisCluster,可是咱们看到,它又是私有的,同时这个类中的公共方法也没有直接获取的。那只能又得经过反射方式获取了。
@Configuration @EnableCaching public class RedisCacheConfig extends CachingConfigurerSupport{ protected final static Logger log = LoggerFactory.getLogger(RedisCacheConfig.class); private volatile JedisConnectionFactory jedisConnectionFactory; private volatile RedisTemplate<String, Object> redisTemplate; private volatile RedisCacheManager cacheManager; public RedisCacheConfig() { super(); } public RedisCacheConfig(JedisConnectionFactory jedisConnectionFactory, RedisTemplate<String, Object> redisTemplate, RedisCacheManager cacheManager) { super(); this.jedisConnectionFactory = jedisConnectionFactory; this.redisTemplate = redisTemplate; this.cacheManager = cacheManager; } public JedisConnectionFactory redisConnectionFactory() { return jedisConnectionFactory; } public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory jedisConnectionFactory) { return redisTemplate; } public CacheManager cacheManager(RedisTemplate<?, ?> redisTemplate) { return cacheManager; } //经过反射获取spring管理的JedisCluster对象 @Bean public JedisCluster jedisCluster(){ MetaObject metaObject = SystemMetaObject.forObject(redisConnectionFactory()); return (JedisCluster)metaObject.getValue("cluster"); } @Bean public KeyGenerator keyGenerator() { return new KeyGenerator() { @Override public Object generate(Object target, Method method, Object... params) { //规定 本类名+方法名+参数名 为key StringBuilder sb = new StringBuilder(); sb.append(target.getClass().getName()+":"); sb.append(method.getName()+":"); for (Object obj : params) { sb.append(obj.toString()+","); } sb.deleteCharAt(sb.length() - 1); return sb.toString(); } }; } }
咱们能够直接经过设置一个@Configuration配置类,在这里面注入jedisConnectionFactory对象,再经过它得到JedisCluster,并设置一个bean供后面使用。像这样:
@Configuration public class RedisConfig { @Autowired private JedisConnectionFactory jedisConnectionFactory; //经过反射获取spring管理的JedisCluster对象 @Bean public JedisCluster jedisCluster(){ MetaObject metaObject = SystemMetaObject.forObject(jedisConnectionFactory); return (JedisCluster)metaObject.getValue("cluster"); } }
获取到了JedisCluster对象,下面就能够直接进行集群下pipeline实现各类集群没法实现的mget、mset等等。
再次说下原理:经过CRC16算法求出全部须要操做的key对应的slot,再经过slot获取到对应的节点链接池,以链接池进行slot分组,进而对相同链接池的key划分到一个组中,而后只须要对相同链接池的key集合进行批量操做就能够了,至关于一个节点下批量操做,同时又使用了pipeline减小了请求,合并了屡次请求,加快了处理速度。
@Component public class JedisClusterUtil implements InitializingBean{ @Autowired private JedisCluster jedisCluster; //存放每一个节点对应的链接池 <host+ip , JedisPool> private Map<String, JedisPool> nodes ; //存放每一个哈希槽(slot)对应的链接池<slot , JedisPool> private Map<Integer, JedisPool> slots ; @Override public void afterPropertiesSet() throws Exception { // TODO 属性赋值以后执行 //从而获取slot和JedisPool直接的映射,经过Ibatis的反射工具实现 MetaObject metaObject = SystemMetaObject.forObject(jedisCluster); //获取到JedisClusterInfoCache 对象后,在进行批量操做时,就能够根据key计算其slot值,获得对应的JedisPool,对key进行分类,而后以pipeline的方式获取值。 JedisClusterInfoCache cache = (JedisClusterInfoCache) metaObject.getValue("connectionHandler.cache"); nodes=cache.getNodes(); //经过反射获取JedisClusterInfoCache中的slots MetaObject meta = SystemMetaObject.forObject(cache); slots=(Map<Integer, JedisPool>)meta.getValue("slots"); } /** * * * @Title: mget * @Description: 批量获取 * @param @param keys * @param @return 设定文件 * @return List<Object> 返回类型 * @throws */ public List<Object> mget(List<String> keys){ List<Object> resList = new ArrayList<>(); if (keys == null || keys.isEmpty()) { return resList; } if (keys.size() == 1) { resList.add(jedisCluster.get(keys.get(0))); return resList; } /*放key大于1时*/ //缓存线程池对应执行的key集合 Map<JedisPool, List<String>> jedisPoolMap = getPoolMap(keys); List<String> realKeys=null; JedisPool currentJedisPool = null; Pipeline currentPipeline = null; //接收pipline结果 List<Object> res = new ArrayList<Object>(); //接收key对应的结果 Map<String, Object> resultMap = new HashMap<String, Object>(); for (Map.Entry<JedisPool, List<String>> entry : jedisPoolMap.entrySet()) { //得到链接池 currentJedisPool=entry.getKey(); //得到本链接池对应的key集合 realKeys=entry.getValue(); Jedis jedis =null; try { //获取pipeline jedis = currentJedisPool.getResource(); currentPipeline = jedis.pipelined(); //这里不能用mget不然将没法与key相对应了。 // currentPipeline.mget(realKeys.toArray(new String[realKeys.size()])); for (String key : realKeys) { currentPipeline.get(key); } //从pipeline中获取结果 res = currentPipeline.syncAndReturnAll(); // currentPipeline.close(); for (int i = 0; i < realKeys.size(); i++) { if (null == res.get(i)) { resultMap.put(realKeys.get(i), null); } else { resultMap.put(realKeys.get(i), res.get(i)); } } } finally { realKeys=null; if(currentPipeline!=null){ try { currentPipeline.close(); } catch (IOException e) { e.printStackTrace(); }finally{ currentPipeline=null;//help GC } } if(jedis!=null){ jedis.close();//归还链接 } } } resList = sortList(keys, resultMap); return resList; } /** * * * @Title: mset * @Description: 批量添加 * @param @param map 设定文件 * @return void 返回类型 * @throws */ public void mset(Map<String, String> map){ if (map == null || map.isEmpty()) { return ; } if (map.size() == 1) { for (Map.Entry<String, String> entry : map.entrySet()) { jedisCluster.set(entry.getKey(), entry.getValue()); } return ; } //当内部数据大于1时 Map<JedisPool, List<String>> jedisPoolMap = getPoolMap(new ArrayList<String>(map.keySet())); List<String> realKeys=null; JedisPool currentJedisPool = null; Pipeline currentPipeline = null; for (Map.Entry<JedisPool, List<String>> entry : jedisPoolMap.entrySet()) { //得到链接池 currentJedisPool=entry.getKey(); //得到本链接池对应的key集合 realKeys=entry.getValue(); Jedis jedis =null; try { //获取pipeline jedis = currentJedisPool.getResource(); currentPipeline = jedis.pipelined(); for (String key : realKeys) { currentPipeline.set(key, map.get(key)); } //pipeline执行 currentPipeline.sync(); } finally { realKeys=null; if(currentPipeline!=null){ try { currentPipeline.close(); } catch (IOException e) { e.printStackTrace(); }finally{ currentPipeline=null;//help GC } } if(jedis!=null){ jedis.close();//归还链接 } } } } /** * * * @Title: hmset * @Description: hash数据批量插入 * @param @param hmap 设定文件 * @return void 返回类型 * @throws */ public void hmset(Map<String,Map<String,String>> hmap){ if (hmap == null || hmap.isEmpty()) { return ; } if (hmap.size() == 1) { for (Map.Entry<String, Map<String,String>> entry : hmap.entrySet()) { jedisCluster.hmset(entry.getKey(), entry.getValue()); } return ; } //当内部数据大于1时 Map<JedisPool, List<String>> jedisPoolMap = getPoolMap(new ArrayList<String>(hmap.keySet())); List<String> realKeys=null; JedisPool currentJedisPool = null; Pipeline currentPipeline = null; for (Map.Entry<JedisPool, List<String>> entry : jedisPoolMap.entrySet()) { //得到链接池 currentJedisPool=entry.getKey(); //得到本链接池对应的key集合 realKeys=entry.getValue(); Jedis jedis =null; try { //获取pipeline jedis = currentJedisPool.getResource(); currentPipeline = jedis.pipelined(); for (String key : realKeys) { currentPipeline.hmset(key, hmap.get(key)); } //pipeline执行 currentPipeline.sync(); } finally { realKeys=null; if(currentPipeline!=null){ try { currentPipeline.close(); } catch (IOException e) { e.printStackTrace(); }finally{ currentPipeline=null;//help GC } } if(jedis!=null){ jedis.close();//归还链接 } } } } /** * * * @Title: smadd * @Description: set批量插入 * @param @param smap 设定文件 * @return void 返回类型 * @throws */ public void smadd(Map<String,Set<String>> smap){ if (smap == null || smap.isEmpty()) { return ; } if (smap.size() == 1) { for (Map.Entry<String, Set<String>> entry : smap.entrySet()) { Set<String> value = entry.getValue(); String[] values=new String[value.size()]; value.toArray(values); jedisCluster.sadd(entry.getKey(), values); } return ; } //当内部数据大于1时 Map<JedisPool, List<String>> jedisPoolMap = getPoolMap(new ArrayList<String>(smap.keySet())); List<String> realKeys=null; JedisPool currentJedisPool = null; Pipeline currentPipeline = null; for (Map.Entry<JedisPool, List<String>> entry : jedisPoolMap.entrySet()) { //得到链接池 currentJedisPool=entry.getKey(); //得到本链接池对应的key集合 realKeys=entry.getValue(); Jedis jedis =null; try { //获取pipeline jedis = currentJedisPool.getResource(); currentPipeline = jedis.pipelined(); for (String key : realKeys) { Set<String> value = smap.get(key); String[] values=new String[value.size()]; value.toArray(values); currentPipeline.sadd(key, values); } //pipeline执行 currentPipeline.sync(); } finally { realKeys=null; if(currentPipeline!=null){ try { currentPipeline.close(); } catch (IOException e) { e.printStackTrace(); }finally{ currentPipeline=null;//help GC } } if(jedis!=null){ jedis.close();//归还链接 } } } } /** * * * @Title: getPoolMap * @Description: 获取链接池对应的操做key的集合 * @param @param keys * @param @return 设定文件 * @return Map<JedisPool,List<String>> 返回类型 * @throws */ public Map<JedisPool, List<String>> getPoolMap(List<String> keys){ //缓存线程池对应执行的key集合 Map<JedisPool, List<String>> jedisPoolMap = new HashMap<JedisPool, List<String>>(); JedisPool currentJedisPool = null; List<String> keyList; for (String key : keys) { //计算哈希槽 int crc = JedisClusterCRC16.getSlot(key); //经过哈希槽获取节点的链接 currentJedisPool = slots.get(crc); /** * 因为JedisPool做为value保存在JedisClusterInfoCache中的一个map对象中,每一个节点的 * JedisPool在map的初始化阶段就是肯定的和惟一的,因此获取到的每一个节点的JedisPool都是同样 * 的,能够做为map的key * */ if (jedisPoolMap.containsKey(currentJedisPool)) { jedisPoolMap.get(currentJedisPool).add(key); } else { keyList = new ArrayList<String>(); keyList.add(key); jedisPoolMap.put(currentJedisPool, keyList); } } return jedisPoolMap; } private List<Object> sortList(List<String> keys, Map<String, Object> params) { List<Object> resultList = new ArrayList<>(); Iterator<String> it = keys.iterator(); while (it.hasNext()) { String key = it.next(); resultList.add(params.get(key)); } return resultList; } public Map<String, JedisPool> getNodes() { return nodes; } public Map<Integer, JedisPool> getSlots() { return slots; } }
定义一个spring组件,实现InitializingBean接口,重写afterPropertiesSet()方法,这样作的目的就是,在bean初始化后,属性赋值以后执行slots的赋值,这样能够全局使用此集合。相信看代码能够明白细节。
留下两个问题:
一、若是redis集群节点出现增长或者删除,或者主从节点的变更该如何处理呢?
二、spring-data-redis的RedisTemplate在集群下操做单个key时能够直接使用,可是若是用RedisTemplate添加一条数据,用jedis的JedisCluster读取一条数据,肯能会存在序列化和反序列化问题,这个该如何处理呢?
能够思考下。
redis集群操做:https://www.cnblogs.com/tony-zt/p/10185660.html
jedis cluster源码学习:https://blog.csdn.net/sinat_36553913/article/details/90342053
https://blog.csdn.net/sinat_36553913/article/details/90551403
https://segmentfault.com/a/1190000013535955
https://www.jianshu.com/p/5ca98b5a336b