Redis-探究-集群扩容致使的Jedis客户端报JedisMovedDataException异常

0 问题的产生

因为线上Redis集群内存使用量已经接近达到预警阈值,须要对Redis集群扩容。(使用的是Redis自带的Redis-Cluster)java

目前有6台主节点,6台从节点。node

暂时称为:redis

  • redis-master001 ~ redis-master006
  • redis-slave001 ~ redis-slave006

须要增长3主3从。缓存

  • redis-master007 ~ redis-master009
  • redis-slave007 ~ redis-master009

以前Redis集群的16384个槽均匀分配在6台主节点中,每一个节点2730个槽。微信

为保证扩容后,槽依然均匀分布,须要将以前6台的每台机器上迁移出910个槽,方案以下:dom

  • redis-master001的910个slot迁移到redis-master007
  • redis-master002的910个slot迁移到redis-master007
  • redis-master003的910个slot迁移到redis-master008
  • redis-master004的910个slot迁移到redis-master008
  • redis-master005的910个slot迁移到redis-master009
  • redis-master006的910个slot迁移到redis-master009

分配完以后,每台节点1820个slot。ide

当将redis-master001的910个slot迁移到redis-master007后,业务上开始报下面的异常源码分析

JedisMovedDataException

在马赛克的上一行,能够看到是调Jedis的get方法出的问题。ui

1 缘由及解决方案

问题的缘由在于使用了Jedis客户端,改成使用JedisCluster客户端便可解决问题。this

出问题的get方法是这样写的(在Jedis原生基础上包装了一层)

// 本身封装的get方法
public String get(String key) {
    String result = “";
    // 为了打印获取链接耗时的日志,这里单独获取了一下Jedis链接
    try (Jedis jedis = this.getResourceLog(key)) {
        TimeCost timeCost = new TimeCost();
        result = jedis.get(key); // 这里报错
        debugLogger.debug("redis cluster get TimeCost={}", timeCost.getCostMillSeconds());
    }
    // 其实改成下面这样get就能够解决一直报JedisMovedDataException问题
    // return jedisCluster.get(key);
    return result;
}
复制代码

getResourceLog方法的做用是根据key计算出这个key所在的slot,再经过slot获取Redis链接。代码以下

private Jedis getResourceLog(String key) {
    TimeCost tc = new TimeCost();
    int slot = JedisClusterCRC16.getSlot(key); // CRC计算slot
    debugLogger.debug("calc slot TimeCost={}", tc.getCostMillSeconds());
    tc.reset();
    Jedis jedis = connectionHandler.getConnectionFromSlot(slot); // 经过slot获取链接
    debugLogger.debug("get connection TimeCost={}", tc.getCostMillSeconds());
    return jedis;
}
复制代码

上面的get方法能够直接改成JedisCluster的get方法解决。

再考虑另一种状况,若是必须经过Jedis操做呢?好比watch方法,JedisCluster是不提供watch的,那么只能经过上述方法在Redis集群中根据key获取到slot,再经过slot获取到jedis连接,而后调用watch。这样一来,在调watch的地方也会报JedisMovedDataException。

例以下面的代码,在业务上须要保证事务的状况下(或乐观锁),可能会这样实现:

Jedis jedis = null;
String key = ...; // redis key
try {
    // 经过上面的getResource方法获取jedis连接
    jedis = getResource(userId);
    // 经过jedis watch key
    if (RedisConstants.SAVE_TO_REDIS_OK.equals(jedis.watch(key))) {

        // .... 业务逻辑 ....
        // ....
        // 经过jedis连接开始事务
        Transaction transaction = jedis.multi();
        // ...
        // ... 执行一些transaction操做...
        // ...
        // 提交事务
        List<Object> execResult = transaction.exec();
        
        return ...;
    }
} catch (Exception ex) {
    // do something ...
} finally {
    if (jedis != null) {
        try {
            if (!flag) {
                jedis.unwatch();
            }
        } finally {
            jedis.close();
        }
    }
}
复制代码

此时若是发生slot迁移,就会报JedisMovedDataException。

那这种状况下的解决方案是什么呢?

其实,优先catch住JedisMovedDataException,而后经过JedisCluster.get(key);一下就行,以下:

Jedis jedis = null;
String key = ...; // redis key
try {
    // 经过上面的getResource方法获取jedis连接
    jedis = getResource(userId);
    // 经过jedis watch key
    if (RedisConstants.SAVE_TO_REDIS_OK.equals(jedis.watch(key))) {

        // .... 业务逻辑 ....
        // ....
        // 经过jedis连接开始事务
        Transaction transaction = jedis.multi();
        // ...
        // ... 执行一些transaction操做...
        // ...
        // 提交事务
        List<Object> execResult = transaction.exec();
        
        return ...;
    }
} catch (JedisMovedDataException jmde) {
    jmde.printStackTrace();
    // redisClusterService中维护着jedisCluster实例,这个get实际上调用的是jedisCluster的get
    redisClusterService.get(key);
    return ...;
} catch (Exception ex) {
    // do something ...
} finally {
    if (jedis != null) {
        try {
            if (!flag) {
                jedis.unwatch();
            }
        } finally {
            jedis.close();
        }
    }
}
复制代码

须要注意的是,用Jedis的get是不能解决的。

2 JedisCluster类图

JedisCluster总体的UML关系以下,先有个总体的印象,在后面的源码分析中,能够再回来看。

Jedis类图

3 为何经过RedisCluster.get一下能够解决?

下面经过JedisCluster源码解释为何这么作能够解决问题,注释中会有详细说明。

JedisCluster.get源码以下:

@Override
public String get(final String key) {
  return new JedisClusterCommand<String>(connectionHandler, maxAttempts) {
    @Override
    public String execute(Jedis connection) {
      return connection.get(key);
    }
  }.run(key);
}
复制代码

发现他是委托给JedisClusterCommand来完成get操做的,也能够发现execute方法其实是使用Jedis来执行的get。这个Jedis实际上就是经过上述方法,先计算出slot,再经过slot获取到Jedis连接的。关键在于最下面run方法的执行,下面具体看一下。

Run方法源码以下:

public T run(String key) {
  // JedisClusterCRC16.getSlot(key) 计算出slot
  return runWithRetries(JedisClusterCRC16.getSlot(key), this.maxAttempts, false, null);
}
复制代码

runWithRetries源码以下

private T runWithRetries(final int slot, int attempts, boolean tryRandomNode, JedisRedirectionException redirect) {
  // 这里是一个重试机制,报异常时触发
  if (attempts <= 0) {
    throw new JedisClusterMaxAttemptsException("No more cluster attempts left.");
  }

  Jedis connection = null;
  try {

    if (redirect != null) {
      connection = this.connectionHandler.getConnectionFromNode(redirect.getTargetNode());
      if (redirect instanceof JedisAskDataException) {
        // TODO: Pipeline asking with the original command to make it faster....
        connection.asking();
      }
    } else {
      if (tryRandomNode) {
        connection = connectionHandler.getConnection();
      } else {
        // 执行到这里,经过slot获取到Jedis connection
        // 内部是经过一个map维护的slot到JedisPool的映射关系
        connection = connectionHandler.getConnectionFromSlot(slot);
      }
    }

    // 执行上面JedisClusterCommand定义的execute方法。
    return execute(connection);

  } catch (JedisNoReachableClusterNodeException jnrcne) {
    throw jnrcne;
  } catch (JedisConnectionException jce) {
    // release current connection before recursion
    releaseConnection(connection);
    connection = null;

    if (attempts <= 1) {
      //We need this because if node is not reachable anymore - we need to finally initiate slots
      //renewing, or we can stuck with cluster state without one node in opposite case.
      //But now if maxAttempts = [1 or 2] we will do it too often.
      //TODO make tracking of successful/unsuccessful operations for node - do renewing only
      //if there were no successful responses from this node last few seconds
      this.connectionHandler.renewSlotCache();
    }

    return runWithRetries(slot, attempts - 1, tryRandomNode, redirect);
  } catch (JedisRedirectionException are) { // *** 关键在这 ***
    // if MOVED redirection occurred,
    // JedisMovedDataException是JedisRedirectionException的子类,因此会执行下面if中的代码
    if (jre instanceof JedisMovedDataException) {
      // it rebuilds cluster's slot cache recommended by Redis cluster specification
      // 从新经过这个jedis连接获取RedisCluster中的Node信息以及slot信息
      this.connectionHandler.renewSlotCache(connection);
    }

    // release current connection before recursion
    releaseConnection(connection);
    connection = null;

    return runWithRetries(slot, attempts - 1, false, jre);
  } finally {
    releaseConnection(connection);
  }
}
复制代码

注释中说到了最终会经过this.connectionHandler.renewSlotCache(connection);来从新获取slot信息。下面来看下这个方法。

public void renewSlotCache(Jedis jedis) {
  cache.renewClusterSlots(jedis);
}
复制代码

调用了cache的renewClusterSlots方法来从新获取slot信息,这个cache是JedisClusterInfoCache类的实例,他里面维护这Node和Slot信息,以下:

public class JedisClusterInfoCache {
  private final Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
  private final Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();

  // ..
}
复制代码

renewClusterSlots方法以下

public void renewClusterSlots(Jedis jedis) {
  //If rediscovering is already in process - no need to start one more same rediscovering, just return
  if (!rediscovering) {
    try {
      w.lock();
      if (!rediscovering) {
        rediscovering = true;

        try {
          if (jedis != null) {
            try {
              // 关键在于这一步,这个方法会从新从远程集群中获取最新的slot信息
              discoverClusterSlots(jedis);
              return;
            } catch (JedisException e) {
              //try nodes from all pools
            }
          }

          for (JedisPool jp : getShuffledNodesPool()) {
            Jedis j = null;
            try {
              j = jp.getResource();
              discoverClusterSlots(j);
              return;
            } catch (JedisConnectionException e) {
              // try next nodes
            } finally {
              if (j != null) {
                j.close();
              }
            }
          }
        } finally {
          rediscovering = false;      
        }
      }
    } finally {
      w.unlock();
    }
  }
}
复制代码

关键在于discoverClusterSlots方法,这个方法的实现以下:

private void discoverClusterSlots(Jedis jedis) {
  // 经过slots命令从远程获取slot信息
  List<Object> slots = jedis.clusterSlots(); 
  this.slots.clear(); // 清除本地缓存slot信息

  // 每一个slotInfoObj包含集群中某一节点的slot信息
  for (Object slotInfoObj : slots) {
    List<Object> slotInfo = (List<Object>) slotInfoObj;

    if (slotInfo.size() <= MASTER_NODE_INDEX) {
      continue;
    }
    // 计算当前节点的slot信息
    List<Integer> slotNums = getAssignedSlotArray(slotInfo);

    // hostInfos
    // 获取这组slot所在的节点信息
    List<Object> hostInfos = (List<Object>) slotInfo.get(MASTER_NODE_INDEX);
    if (hostInfos.isEmpty()) {
      continue;
    }

    // at this time, we just use master, discard slave information
    HostAndPort targetNode = generateHostAndPort(hostInfos);
    // 从新关联这组slot到远程节点的映射,至此,完成slot信息的刷新
    assignSlotsToNode(slotNums, targetNode);
  }
}
复制代码

4 为何Jedis的get不行?

首先咱们来对比一下JedisCluster的get和Jedis的get

JedisCluster.get

@Override
public String get(final String key) {
  return new JedisClusterCommand<String>(connectionHandler, maxAttempts) {
    @Override
    public String execute(Jedis connection) {
      return connection.get(key); // 这里追踪进去,就是Jedis.get
    }
  }.run(key);
}
复制代码

Jedis.get

@Override
public String get(final String key) {
  checkIsInMultiOrPipeline();
  client.get(key);
  return client.getBulkReply();
}
复制代码

由此可知,Jedis.get没有了run方法中的异常重试和从新发现机制,因此Jedis.get不行。

5 总结

本文从一次线上扩容引起问题的讨论,由扩容引出了slot的迁移,由slot的迁移引出线上报错-JedisMovedDataException,而后说明了引起这个异常的缘由,是由于咱们使用了Jedis客户端,致使没法自动发现远程集群slot的变化。

而后提出了解决方案,经过使用JedisCluster来解决没法自动发现slot变化的问题。并从源码的角度说明了为何JedisCluster的get方法能够自动发现远程slot的变化。


欢迎关注个人微信公众号

公众号
相关文章
相关标签/搜索