做者:莫那·鲁道 原文:http://thinkinjava.cn/2018/08/Jedis-%E5%A6%82%E4%BD%95%E6%94%AF%E6%8C%81-Cluster/
前言
前面说了 Jedis(2.9.0) 如何支持 Redis Sentinel 的,今天看看 Jedis 是如何支持 Redis Cluster 的。java
1 初始化
Jedis Cluster 构造方法:node
public JedisCluster(Set<HostAndPort> jedisClusterNode, int connectionTimeout, int soTimeout, int maxAttempts, final GenericObjectPoolConfig poolConfig) { super(jedisClusterNode, connectionTimeout, soTimeout, maxAttempts, poolConfig); }
注意: Set<HostAndPort> jedisClusterNode
中包含全部主从节点。web
经过层层跟踪,咱们来到了 initializeSlotsCache 方法。redis
private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig, String password) { for (HostAndPort hostAndPort : startNodes) { Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort()); if (password != null) { jedis.auth(password); } try { // cache.discoverClusterNodesAndSlots(jedis); break; } catch (JedisConnectionException e) { // try next nodes } finally { if (jedis != null) { jedis.close(); } } } }
这个 cache 设计上就是 Redis Cluster slot 的缓存,每一个 slot 都指向一个链接池。看看这个 cache 的内部结构:算法
public class JedisClusterInfoCache { private final Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();// ip:port 对应的链接池 private final Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();// slot 对应的链接池 private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); private final Lock r = rwl.readLock(); private final Lock w = rwl.writeLock(); private volatile boolean rediscovering; private final GenericObjectPoolConfig poolConfig; private int connectionTimeout; private int soTimeout; private String password; private static final int MASTER_NODE_INDEX = 2;// 主节点下标
其中,在 initializeSlotsCache
方法中,会遍历全部的节点信息,可是,只会执行一次 cache.discoverClusterNodesAndSlots(jedis)
,若是失败了,就继续执行这个方法。为何只须要执行一次呢?数据库
来看看 cache.discoverClusterNodesAndSlots
方法:缓存
public void discoverClusterNodesAndSlots(Jedis jedis) { w.lock(); try { reset(); List<Object> slots = jedis.clusterSlots();// 节点的槽位集合:[[10924, 16383, [[B@4ae82894, 6386], [[B@543788f3, 6387]], [5462, 10923, [[B@6d3af739, 6384], [[B@1da51a35, 6385]], [0, 5461, [[B@16022d9d, 6382], [[B@7e9a5fbe, 6383]]] for (Object slotInfoObj : slots) {// 遍历集合 List<Object> slotInfo = (List<Object>) slotInfoObj; if (slotInfo.size() <= MASTER_NODE_INDEX) {// 若是此节点信息少于3 个,跳过这次循环,通常是: slotIndex, slotIndex,{ip byte[], port},{ip byte[], port} continue; } List<Integer> slotNums = getAssignedSlotArray(slotInfo);// 获得全部的 slot 数字 // hostInfos int size = slotInfo.size(); // 从第三位开始循环,是主节点信息 for (int i = MASTER_NODE_INDEX; i < size; i++) { List<Object> hostInfos = (List<Object>) slotInfo.get(i);// 获得主节点信息 if (hostInfos.size() <= 0) { continue; } HostAndPort targetNode = generateHostAndPort(hostInfos); // 解析出 ip + port setupNodeIfNotExist(targetNode);// 建立链接池,并放入缓存 if (i == MASTER_NODE_INDEX) {// 若是是主节点,就将该全部槽位指向同一个链接池 assignSlotsToNode(slotNums, targetNode); } } } } finally { w.unlock(); } }
该方法做用以下:经过任意一个节点,获得全部主节点的信息。数据格式为:微信
获得这些信息后,根据 ip + port 建立链接池,并缓存全部的链接池,key 为 “ip:port”,value 则是对应的链接池,若是是主节点,则更进一步,将 solt 和链接池也所有缓存,便于查询。运维
该方法涉及的几个方法以下:dom
private List<Integer> getAssignedSlotArray(List<Object> slotInfo) { List<Integer> slotNums = new ArrayList<Integer>(); // 0位是起始 slot, 1 位是截止 slot, 这里是获得全部的 slot for (int slot = ((Long) slotInfo.get(0)).intValue(); slot <= ((Long) slotInfo.get(1)).intValue(); slot++) {// 初始是第一个, slot 不能大于第二个 slot slotNums.add(slot); } return slotNums; }
public JedisPool setupNodeIfNotExist(HostAndPort node) { w.lock(); try { String nodeKey = getNodeKey(node); // ip:port JedisPool existingPool = nodes.get(nodeKey);// 从 map 里获取缓存 if (existingPool != null) return existingPool;// 若是有,就再也不初始化 // 建立链接池 JedisPool nodePool = new JedisPool(poolConfig, node.getHost(), node.getPort(), connectionTimeout, soTimeout, password, 0, null, false, null, null, null); nodes.put(nodeKey, nodePool);// 缓存 return nodePool; } finally { w.unlock(); } }
public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode) { w.lock(); try { JedisPool targetPool = setupNodeIfNotExist(targetNode);// 获取该节点的链接池 for (Integer slot : targetSlots) {// 将全部槽位指向该链接池 slots.put(slot, targetPool); } } finally { w.unlock(); } }
因此,当这个步骤成功之后,全部的 slot 和对应的链接池都初始化好了,后面就直接 break 了。若是途中失败了,则继续尝试。
2 发送命令和重试机制
好了,咱们已经知道,slot 和链接池是保存在 JedisClusterInfoCache 类中的,那么,咱们使用 API 的时候又是怎么操做的呢?
以 set 方法为例:
public String set(final String key, final String value) { return new JedisClusterCommand<String>(connectionHandler, maxAttempts) { @Override public String execute(Jedis connection) { return connection.set(key, value); } }.run(key); }
这里会建立一个 Redis 命令对象,而后执行 run 方法,run 方法里会回调命令对象的 execute 方法。run 方法内部调用的是 runWithRetries 方法,看名字,这是一个带有重试机制的方法. 该方法有个参数就是 int attempts,用户本身设置的重试次数。
看看 runWithRetries 方法实现(由于包含了失败重试逻辑,因此很长):
private T runWithRetries(byte[] key, int attempts, boolean tryRandomNode, boolean asking) { if (attempts <= 0) {// 重试次数 throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?"); } Jedis connection = null; try { if (asking) {// 第一次 false,若是节点 A 正在迁移槽 i 至节点 B , 那么当节点 A 没能在本身的数据库中找到命令指定的数据库键时, 节点 A 会向客户端返回一个 ASK 错误, 指引客户端到节点 B 继续查找指定的数据库键 connection = askConnection.get(); connection.asking();// 到目标节点打开客户端链接标识 // if asking success, reset asking flag asking = false; } else { if (tryRandomNode) {// 若是是随机的 connection = connectionHandler.getConnection(); } else {// 默认不是随机的,经过 CRC16 算法获取 slot 对应的节点的链接池中的链接 connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key)); } } // 执行 return execute(connection); } catch (JedisNoReachableClusterNodeException jnrcne) {// 集群不存在 throw jnrcne; } catch (JedisConnectionException jce) {// 链接异常 // release current connection before recursion releaseConnection(connection);//归还链接 connection = null; if (attempts <= 1) {// 若是重试次数只有一次,那就更新链接池,并抛出异常 this.connectionHandler.renewSlotCache(); throw jce; } return runWithRetries(key, attempts - 1, tryRandomNode, asking);// 不然递归重试,重试次数减一 } catch (JedisRedirectionException jre) {// 若是是重定向异常,例如 moved ,ASK // if MOVED redirection occurred, if (jre instanceof JedisMovedDataException) {// 节点在接到一个命令请求时, 会先检查这个命令请求要处理的键所在的槽是否由本身负责, 若是不是的话, 节点将向客户端返回一个 MOVED 错误, MOVED 错误携带的信息能够指引客户端转向至正在负责相关槽的节点 // 若是是 moved 错误,就更新链接池, ASK 就没必要更新缓存,只须要临时访问就行 this.connectionHandler.renewSlotCache(connection); } // 归还旧的链接 releaseConnection(connection); connection = null; // 若是是 ASK if (jre instanceof JedisAskDataException) { asking = true; // 设置 ThreadLocal,新的链接是 ASK 指定的节点 askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode())); } else if (jre instanceof JedisMovedDataException) {// 若是是 moved 错误,不处理错误,重试。 } else { throw new JedisClusterException(jre); } // 重试 return runWithRetries(key, attempts - 1, false, asking); } finally { releaseConnection(connection); } }
该方法主要步骤以下:
默认是使用 CRC16 算法经过 key 获得 slot ,而后,根据 slot 获得 Jedis 链接,也就是从咱们刚刚说的缓存里获取链接。
获得链接后,回调命令对象的 execute 方法。
若是发生了 JedisNoReachableClusterNodeException 异常,代表集群不存在,则直接抛出异常,结束方法。
若是发生了 JedisConnectionException 链接异常,则进行递归重试,若是重试次数只剩一次,则刷新链接池缓存。
若是发生了 JedisRedirectionException 重定向异常,若是返回的是 moved,则刷新链接池。若是是 ASK,则不刷新链接池,在下次递归中直接使用 ASK 返回的信息进行调用。下次递归时,先执行 asking 命令打开新的客户端链接,若是成功,则执行真正的命令。
最终,归还链接。
大体的流程图以下:
这里说一下 ASK 和 MOVED:
ASK:若是节点 A 正在迁移槽 i 至节点 B , 那么当节点 A 没能在本身的数据库中找到命令指定的数据库键时, 节点 A 会向客户端返回一个 ASK 错误, 指引客户端到节点 B 继续查找指定的数据库键。
MOVED:节点在接到一个命令请求时, 会先检查这个命令请求要处理的键所在的槽是否由本身负责, 若是不是的话, 节点将向客户端返回一个 MOVED 错误, MOVED 错误携带的信息能够指引客户端转向至正在负责相关槽的节点。
二者的共同点都是重定向,不一样点是:ASK 是迁移过程当中返回的,MOVED 是迁移结束后返回的。如返回 ASK ,那么就没必要更新客户端缓存,由于客户端没法知道何时迁移完成,所以只能是临时性的重定向。可是 MOVED 重定向说明键对应的 slot 已经成功的转移到了新的节点,那么就能够换成这些链接。
注意:当重试次数不够时,会抛出 throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?")
异常,缘由是节点宕机或请求超时触发了重试,而重试次数耗尽就会触发这个异常。
当 Cluster 进行故障发现到完成故障转移,须要必定的时间,节点宕机期间,全部指向这个节点的命令都会触发重试,当收到 moved 命令则会进行链接刷新 —— 也就是 renewSlotCache 方法。
注意:更新链接池的过程是串行加锁的!!
代码以下:
public void renewClusterSlots(Jedis jedis) { //If rediscovering is already in process - no need to start one more same rediscovering, just return if (!rediscovering) { try { w.lock(); rediscovering = true; if (jedis != null) { try { discoverClusterSlots(jedis); return; } catch (JedisException e) { //try nodes from all pools } } for (JedisPool jp : getShuffledNodesPool()) { try { jedis = jp.getResource(); discoverClusterSlots(jedis); return; } catch (JedisConnectionException e) { // try next nodes } finally { if (jedis != null) { jedis.close(); } } } } finally { rediscovering = false; w.unlock(); } } }
注意:代码中使用了写锁,而获取链接池时则使用了读锁,读写锁是互斥的,这时将致使全部访问集群的线程阻塞!!!
固然,只有出现 MOVED 错误或者 JedisConnectionException 异常且没法继续重试时,才会进行刷新链接池操做。
3 总结
本文旨在分析 Jedis 如何支持 Redis Cluster,由于 Redis Cluster 须要客户端来支持分片。Jedis 内部使用了一个 JedisClusterInfoCache 保存 slot 和 pool,ip:port 和 pool 的映射关系,ip:port 的缓存更可能是服务于 ask 时寻找节点。
在使用客户端是时候,Jedis 会有重试机制,用户能够设置重试次数,若是发生了 ask,客户端会自动根据返回值重定向,若是发生了 moved,则会刷新链接池中的 slot,由于集群发生了迁移。
须要注意的是,当集群进行迁移的时候,若是有客户端访问迁移的节点,那么将会致使刷新链接池,而这个链接池是有锁,当刷新的时候,使用的是写锁,将致使全部的读都会阻塞,因此,迁移尽可能在业务低谷进行。
了解客户端的原理,有助于咱们理解 Redis Cluster 的运行原理,也有助于咱们平时编写代码,运维缓存,排查故障。
本文分享自微信公众号 - 码农沉思录(code-thinker)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。