在项目中咱们常常使用spring-data-redis来操做Redis,它封装了Jedis客户端来与Redis服务器进行各类命令操做。因为最近用到了Redis Cluster集群功能,这里就分析总结一下Jedis cluster集群初始化主要过程及源码。node
jar版本: spring-data-redis-1.8.4-RELEASE.jar、jedis-2.9.0.jargit
测试环境: Redis 3.2.8,八个集群节点github
applicationContext-redis-cluster.xml 配置文件:redis
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd"> <!-- 链接池配置. --> <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig"> <!-- 链接池中最大链接数。高版本:maxTotal,低版本:maxActive --> <property name="maxTotal" value="8" /> <!-- 链接池中最大空闲的链接数. --> <property name="maxIdle" value="4" /> <!-- 链接池中最少空闲的链接数. --> <property name="minIdle" value="1" /> <!-- 当链接池资源耗尽时,调用者最大阻塞的时间,超时将跑出异常。单位,毫秒数;默认为-1.表示永不超时。高版本:maxWaitMillis,低版本:maxWait --> <property name="maxWaitMillis" value="5000" /> <!-- 链接空闲的最小时间,达到此值后空闲链接将可能会被移除。负值(-1)表示不移除. --> <property name="minEvictableIdleTimeMillis" value="300000" /> <!-- 对于“空闲连接”检测线程而言,每次检测的连接资源的个数。默认为3 --> <property name="numTestsPerEvictionRun" value="3" /> <!-- “空闲连接”检测线程,检测的周期,毫秒数。若是为负值,表示不运行“检测线程”。默认为-1. --> <property name="timeBetweenEvictionRunsMillis" value="60000" /> <!-- testOnBorrow:向调用者输出“连接”资源时,是否检测是有有效,若是无效则从链接池中移除,并尝试获取继续获取。默认为false。建议保持默认值. --> <!-- testOnReturn:向链接池“归还”连接时,是否检测“连接”对象的有效性。默认为false。建议保持默认值. --> <!-- testWhileIdle:向调用者输出“连接”对象时,是否检测它的空闲超时;默认为false。若是“连接”空闲超时,将会被移除。建议保持默认值. --> <!-- whenExhaustedAction:当“链接池”中active数量达到阀值时,即“连接”资源耗尽时,链接池须要采起的手段, 默认为1(0:抛出异常。1:阻塞,直到有可用连接资源。2:强制建立新的连接资源) --> </bean> <bean id="n1" class="org.springframework.data.redis.connection.RedisNode"> <constructor-arg value="127.0.0.1" /> <constructor-arg value="6379" type="int" /> </bean> <bean id="n2" class="org.springframework.data.redis.connection.RedisNode"> <constructor-arg value="127.0.0.1" /> <constructor-arg value="6380" type="int" /> </bean> <bean id="n3" class="org.springframework.data.redis.connection.RedisNode"> <constructor-arg value="127.0.0.1" /> <constructor-arg value="6381" type="int" /> </bean> <bean id="redisClusterConfiguration" class="org.springframework.data.redis.connection.RedisClusterConfiguration"> <property name="clusterNodes"> <set> <ref bean="n1" /> <ref bean="n2" /> <ref bean="n3" /> </set> </property> <property name="maxRedirects" value="5" /> </bean> <bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"> <constructor-arg ref="redisClusterConfiguration" /> <constructor-arg ref="jedisPoolConfig" /> </bean> <!-- Spring提供的访问Redis类. --> <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate"> <property name="connectionFactory" ref="jedisConnectionFactory" /> <property name="KeySerializer"> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer" /> </property> <property name="ValueSerializer"> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer" /> </property> <property name="hashKeySerializer"> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer" /> </property> <property name="hashValueSerializer"> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer" /> </property> </bean> <!-- Redis配置结束 --> </beans>
Jedis与Redis集群交互时,涉及的类能够分为两类,分别以下:spring
一、Redis集群信息配置类:缓存
类名 | 说明 |
---|---|
redis.clients.jedis.JedisPoolConfig | 保存Jedis链接池配置信息 |
org.springframework.data.redis.connection.RedisNode | 保存Redis集群节点信息 |
org.springframework.data.redis.connection.RedisClusterConfiguration | 保存Redis集群配置信息 |
org.springframework.data.redis.connection.jedis.JedisConnectionFactory | Jedis链接工厂,负责建立JedisCluster集群操做类,获取Redis链接对象 |
org.springframework.data.redis.connection.jedis.JedisClusterConnection | 在JedisCluster基础上实现,根据key类型使用具体的Jedis类与Redis进行交互 |
二、Redis集群信息操做类:安全
类名 | 说明 |
---|---|
redis.clients.jedis.JedisCluster | 扩展了BinaryJedisCluster类,负责与Redis集群进行String类型的key交互 |
redis.clients.jedis.BinaryJedisCluster | JedisCluster的父类,负责与Redis集群进行byte[]类型的key交互 |
redis.clients.jedis.JedisSlotBasedConnectionHandler | JedisClusterConnectionHandler类的子类,负责根据key的slot值获取Redis链接 |
redis.clients.jedis.JedisClusterConnectionHandler | 一个抽象类,负责初始化、重建、重置Redis slot槽缓存 |
redis.clients.jedis.JedisClusterInfoCache | Redis slot缓存类,负责保存、重建和自动发现Redis slot槽与集群节点的关系 |
从上面的配置文件applicationContext-redis-cluster.xml中咱们声明了JedisConnectionFactory这个类:服务器
<bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"> <constructor-arg ref="redisClusterConfiguration" /> <constructor-arg ref="jedisPoolConfig" /> </bean>
这个类是用来建立、管理和销毁Jedis与Redis集群的链接的。因为咱们在Spring配置文件中声明了这个类,所以当应用启动时,Spring会自动加载该类,Jedis集群信息初始化的动做也由此开始。该类初始化的方法代码以下:app
public class JedisConnectionFactory implements InitializingBean, DisposableBean, RedisConnectionFactory { private JedisPoolConfig poolConfig = new JedisPoolConfig(); private RedisClusterConfiguration clusterConfig; public JedisConnectionFactory(RedisClusterConfiguration clusterConfig, JedisPoolConfig poolConfig) { this.clusterConfig = clusterConfig; this.poolConfig = poolConfig; } /* * (non-Javadoc) * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() */ public void afterPropertiesSet() { if (shardInfo == null) { shardInfo = new JedisShardInfo(hostName, port); if (StringUtils.hasLength(password)) { shardInfo.setPassword(password); } if (timeout > 0) { setTimeoutOn(shardInfo, timeout); } } if (usePool && clusterConfig == null) { this.pool = createPool(); } //若是集群配置信息不为空,则建立JedisCluster对象 if (clusterConfig != null) { this.cluster = createCluster(); } } }
在上面的配置文件中,咱们使用构造函数注入的方式初始化了JedisConnectionFactory,因为该类实现了InitializingBean接口,所以在它被初始化以后会调用afterPropertiesSet()方法,在该方法中会根据clusterConfig集群配置信息是否为空来建立JedisCluster对象。createCluster()代码定义以下:ide
private JedisCluster createCluster() { JedisCluster cluster = createCluster(this.clusterConfig, this.poolConfig); this.clusterCommandExecutor = new ClusterCommandExecutor( new JedisClusterConnection.JedisClusterTopologyProvider(cluster), new JedisClusterConnection.JedisClusterNodeResourceProvider(cluster), EXCEPTION_TRANSLATION); return cluster; } /** * Creates {@link JedisCluster} for given {@link RedisClusterConfiguration} and {@link GenericObjectPoolConfig}. * * @param clusterConfig must not be {@literal null}. * @param poolConfig can be {@literal null}. * @return * @since 1.7 */ protected JedisCluster createCluster(RedisClusterConfiguration clusterConfig, GenericObjectPoolConfig poolConfig) { Assert.notNull(clusterConfig, "Cluster configuration must not be null!"); Set<HostAndPort> hostAndPort = new HashSet<HostAndPort>(); for (RedisNode node : clusterConfig.getClusterNodes()) { hostAndPort.add(new HostAndPort(node.getHost(), node.getPort())); } int redirects = clusterConfig.getMaxRedirects() != null ? clusterConfig.getMaxRedirects().intValue() : 5; return StringUtils.hasText(getPassword()) ? new JedisCluster(hostAndPort, timeout, timeout, redirects, password, poolConfig) : new JedisCluster(hostAndPort, timeout, redirects, poolConfig); }
上面的代码调用了JedisCluster的构造函数来建立JedisCluster对象,JedisCluster使用super关键字调用父类的构造函数:
public JedisCluster(Set<HostAndPort> jedisClusterNode, int timeout, int maxAttempts, final GenericObjectPoolConfig poolConfig) { super(jedisClusterNode, timeout, maxAttempts, poolConfig); }
BinaryJedisCluster构造函数:
public BinaryJedisCluster(Set<HostAndPort> jedisClusterNode, int timeout, int maxAttempts, final GenericObjectPoolConfig poolConfig) { this.connectionHandler = new JedisSlotBasedConnectionHandler(jedisClusterNode, poolConfig, timeout); this.maxAttempts = maxAttempts; }
初始化流程到这里,主要的部分就要浮出水面了。在BinaryJedisCluster类的构造函数中初始化了JedisSlotBasedConnectionHandler类,该类的出现说明Jedis要开始获取Redis集群的slot槽和Redis集群节点信息了,该类也是使用super关键字调用父类构造函数来初始化的,它的父类JedisClusterConnectionHandler构造函数以下:
public JedisClusterConnectionHandler(Set<HostAndPort> nodes, final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password) { this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout, password); //这里是关键 initializeSlotsCache(nodes, poolConfig, password); }
JedisClusterConnectionHandler类的构造函数中建立了JedisClusterInfoCache对象,并调用initializeSlotsCache()方法对Redis集群信息进行初始化。该类的主要方法以下:
public Jedis getConnectionFromNode(HostAndPort node) { return cache.setupNodeIfNotExist(node).getResource(); } public Map<String, JedisPool> getNodes() { return cache.getNodes(); } private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig, String password) { for (HostAndPort hostAndPort : startNodes) { Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort()); if (password != null) { jedis.auth(password); } try { cache.discoverClusterNodesAndSlots(jedis); break; } catch (JedisConnectionException e) { // try next nodes } finally { if (jedis != null) { jedis.close(); } } } } public void renewSlotCache() { cache.renewClusterSlots(null); } public void renewSlotCache(Jedis jedis) { cache.renewClusterSlots(jedis); } @Override public void close() { cache.reset(); }
能够看到,该类主要仍是调用JedisClusterInfoCache对象的方法来完成slot的相关操做。所以咱们重点看一下JedisClusterInfoCache类。
JedisClusterInfoCache类主要负责发送cluster slots命令来获取Redis集群节点的槽和Redis集群节点信信息,并将相应信息保存到Map缓存中。咱们使用redis-cli客户端工具链接上任意一个Redis中的集群节点,向Redis发送该命令以后,得到的结果以下:
127.0.0.1:6379> cluster slots 1) 1) (integer) 12288 2) (integer) 16383 3) 1) "127.0.0.1" 2) (integer) 6382 3) "65aea5fc4485bc7c0c3c4425fb3f500c562ee243" 4) 1) "127.0.0.1" 2) (integer) 6386 3) "4061e306b094e707b6f4a7c8cd8e82bd61155060" 2) 1) (integer) 4096 2) (integer) 8191 3) 1) "127.0.0.1" 2) (integer) 6380 3) "c6e1b3691b968b009357dcac3349afbcd557fd8c" 4) 1) "127.0.0.1" 2) (integer) 6384 3) "f915c7e6812a7d8fbe637c782ad261cd453022b2" 3) 1) (integer) 0 2) (integer) 4095 3) 1) "127.0.0.1" 2) (integer) 6379 3) "91bb43a956a04a9812e4d6950efebbb2e0f646fd" 4) 1) "127.0.0.1" 2) (integer) 6383 3) "c1d9d907f6905dd826dad774d127b75484ef8ea8" 4) 1) (integer) 8192 2) (integer) 12287 3) 1) "127.0.0.1" 2) (integer) 6381 3) "745936c1192bc1b136fd1f5df842bc1dd517ef36" 4) 1) "127.0.0.1" 2) (integer) 6385 3) "1c07bd8406156122eb4855d2e8b36e785e7901c7"
我如今本地的Redis集群有八个节点,四个主节点,四个从节点,经过cluster slots命令的结果均可以清楚地看到这些节点信息。这个命令的每一组结果由四个部分组成:起始槽节点、终止槽节点、主节点IP和端口加节点ID、从节点IP和端口加节点ID。
在JedisClusterInfoCache类中,相关的源码以下:
public class JedisClusterInfoCache { // 保存Redis集群节点和节点链接池信息:key为节点地址、value为链接池 private final Map<String, JedisPool> nodes = new HashMap<String, JedisPool>(); // 保存Redis集群节点槽和槽所在的主节点链接池信息:key为节点槽、value为链接池 private final Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>(); // 使用读写锁保证nodes和slots两个map的写安全 private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); private final Lock r = rwl.readLock(); private final Lock w = rwl.writeLock(); // 重建缓存的标识变量,false为未进行,true为正在进行 private volatile boolean rediscovering; private final GenericObjectPoolConfig poolConfig; private int connectionTimeout; private int soTimeout; private String password; // 主节点索引位置标识,遍历cluster slots结果时使用 private static final int MASTER_NODE_INDEX = 2; public JedisClusterInfoCache(final GenericObjectPoolConfig poolConfig, int timeout) { this(poolConfig, timeout, timeout, null); } public JedisClusterInfoCache(final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout, final String password) { this.poolConfig = poolConfig; this.connectionTimeout = connectionTimeout; this.soTimeout = soTimeout; this.password = password; } /** * 在jedis封装的redis集群节点信息上发送cluster slots命令,获取全部集群节点信息和槽信息 * * @param jedis */ public void discoverClusterNodesAndSlots(Jedis jedis) { w.lock();// 由当前线程得到写锁,在当前线程操做未结束以前,其余线程只能等待 try { reset();// 重置nodes、slots两个Map,释放JedisPool链接池资源 List<Object> slots = jedis.clusterSlots();// 在redis集群节点信息上发送cluster slots命令,获取全部集群节点信息和槽信息 // 遍历slots集合,保存Redis集群节点和节点链接池信息到nodes Map中,保存Redis集群节点槽和槽所在的主节点链接池信息到slots Map中 for (Object slotInfoObj : slots) { List<Object> slotInfo = (List<Object>) slotInfoObj; if (slotInfo.size() <= MASTER_NODE_INDEX) { continue; } // 获取槽节点集合 List<Integer> slotNums = getAssignedSlotArray(slotInfo); // hostInfos int size = slotInfo.size(); // 遍历slots集合元素中的主从节点信息,保存Redis集群节点和节点链接池信息到nodes Map中,保存Redis集群节点槽和槽所在的主节点链接池信息到slots Map中 for (int i = MASTER_NODE_INDEX; i < size; i++) { List<Object> hostInfos = (List<Object>) slotInfo.get(i); if (hostInfos.size() <= 0) { continue; } // 获取集群节点的服务器地址和端口 HostAndPort targetNode = generateHostAndPort(hostInfos); // 保存Redis集群节点和节点链接池信息到nodes Map中 setupNodeIfNotExist(targetNode); // 若是当前遍历的是主节点信息,则保存Redis集群节点槽和槽所在的主节点链接池信息到slots Map中 if (i == MASTER_NODE_INDEX) { assignSlotsToNode(slotNums, targetNode); } } } } finally { w.unlock();// 释放写锁,使其余线程使用 } } /** * 重建Cluster集群节点和Slot槽缓存 * * @param jedis */ public void renewClusterSlots(Jedis jedis) { // 若是重建操做未进行,则开始重建缓存操做 if (!rediscovering) { try { w.lock(); rediscovering = true;// 设重建缓存标识变量的值为true,表示重建操做正在进行 // 若是封装redis链接信息的jedis对象不为空,则使用该节点进行重建缓存操做并返回 if (jedis != null) { try { discoverClusterSlots(jedis); return; } catch (JedisException e) { // try nodes from all pools } } // 若是封装redis链接信息的jedis对象为空,则打乱nodes Map中保存的jedis链接池信息,遍历链接池中的节点进行重建缓存操做并返回 for (JedisPool jp : getShuffledNodesPool()) { try { jedis = jp.getResource(); discoverClusterSlots(jedis); return; } catch (JedisConnectionException e) { // try next nodes } finally { if (jedis != null) { jedis.close(); } } } } finally { rediscovering = false;// 设重建缓存标识变量的值为false,表示重建操做未进行 w.unlock(); } } } /** * 逻辑相似discoverClusterNodesAndSlots方法 * * @param jedis */ private void discoverClusterSlots(Jedis jedis) { List<Object> slots = jedis.clusterSlots(); this.slots.clear(); for (Object slotInfoObj : slots) { List<Object> slotInfo = (List<Object>) slotInfoObj; if (slotInfo.size() <= MASTER_NODE_INDEX) { continue; } List<Integer> slotNums = getAssignedSlotArray(slotInfo); // hostInfos List<Object> hostInfos = (List<Object>) slotInfo.get(MASTER_NODE_INDEX); if (hostInfos.isEmpty()) { continue; } // at this time, we just use master, discard slave information HostAndPort targetNode = generateHostAndPort(hostInfos); assignSlotsToNode(slotNums, targetNode); } } private HostAndPort generateHostAndPort(List<Object> hostInfos) { return new HostAndPort(SafeEncoder.encode((byte[]) hostInfos.get(0)), ((Long) hostInfos.get(1)).intValue()); } /** * 保存Redis集群节点和节点链接池信息到nodes Map中 * * @param node * @return */ public JedisPool setupNodeIfNotExist(HostAndPort node) { w.lock(); try { // 获取节点key,形式为"服务器地址:端口" String nodeKey = getNodeKey(node); // 若是节点已存在nodes Map中,则直接返回 JedisPool existingPool = nodes.get(nodeKey); if (existingPool != null) return existingPool; // 建立节点相应的JedisPool链接池对象,并保存到nodes Map中,而后返回JedisPool链接池对象 JedisPool nodePool = new JedisPool(poolConfig, node.getHost(), node.getPort(), connectionTimeout, soTimeout, password, 0, null, false, null, null, null); nodes.put(nodeKey, nodePool); return nodePool; } finally { w.unlock(); } } /** * 遍历槽集合,保存Redis集群节点槽和槽所在的主节点链接池信息到slots Map中 * * @param targetSlots * @param targetNode */ public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode) { w.lock(); try { JedisPool targetPool = setupNodeIfNotExist(targetNode); for (Integer slot : targetSlots) { slots.put(slot, targetPool); } } finally { w.unlock(); } } /** * 根据节点key获取JedisPool链接池对象 * * @param nodeKey * @return */ public JedisPool getNode(String nodeKey) { r.lock(); try { return nodes.get(nodeKey); } finally { r.unlock(); } } /** * 根据slot槽值获取JedisPool链接池对象 * * @param slot * @return */ public JedisPool getSlotPool(int slot) { r.lock(); try { return slots.get(slot); } finally { r.unlock(); } } /** * 获取节点信息和节点对象对应的链接池信息 * * @return */ public Map<String, JedisPool> getNodes() { r.lock(); try { return new HashMap<String, JedisPool>(nodes); } finally { r.unlock(); } } /** * 获取nodes Map打乱顺序后的Redis集群节点链接池信息 * * @return */ public List<JedisPool> getShuffledNodesPool() { r.lock(); try { List<JedisPool> pools = new ArrayList<JedisPool>(nodes.values()); Collections.shuffle(pools); return pools; } finally { r.unlock(); } } /** * 清空集群节点集合和槽集合,释放JedisPool资源 */ public void reset() { w.lock(); try { for (JedisPool pool : nodes.values()) { try { if (pool != null) { pool.destroy(); } } catch (Exception e) { // pass } } nodes.clear(); slots.clear(); } finally { w.unlock(); } } public static String getNodeKey(HostAndPort hnp) { return hnp.getHost() + ":" + hnp.getPort(); } /** * 遍历槽区间,获取槽节点集合 * * @param slotInfo * @return */ private List<Integer> getAssignedSlotArray(List<Object> slotInfo) { List<Integer> slotNums = new ArrayList<Integer>(); for (int slot = ((Long) slotInfo.get(0)).intValue(); slot <= ((Long) slotInfo.get(1)).intValue(); slot++) { slotNums.add(slot); } return slotNums; } }
Jedis初始化Redis集群信息时,先使用JedisConnectionFactory获取JedisCluster对象,再根据JedisCluster去逐步引出JedisClusterInfoCache对象完成Redis集群信息的获取。在这个类中,主要有如下几点:
下一篇文章剖析Jedis cluster命令执行流程。