Lettuce之RedisClusterClient使用以及源码分析

  

  Redis Cluster模式简介

    redis集群并无使用一致性hash算法而引入了哈希槽概念,Redis 集群有16384个哈希槽,每一个key经过CRC16校验后对16384取模来决定放置哪一个槽.集群的每一个节点负责一部分hash槽.也就是说若是key是不变的对应的slot也是不变的html

  Redis 服务器命令    

  • cluster info

能够经过cluster info 命令查看集群信息java

cluster info
cluster_state:ok
cluster_slots_assigned:16384
cluster_slots_ok:16384
cluster_slots_pfail:0
cluster_slots_fail:0
cluster_known_nodes:12
  • cluster nodes 

经过cluster nodes命令查看当前节点以及该节点分配的slot,以下图能够发现当前redis集群有12个节点,每一个节点大约管理1365个slotnode

xx.xxx.xxx.xx:6959> cluster nodes 45abb8663c0cdb25ed17c29521bf6fda98e913ea xx.xxx.xxx.xx:6961 master - 0 1529229636724 11 connected 13653-15018 e40080f32a3fb89e34b7622038ce490682428fdf xx.xxx.xxx.xx:6960 master - 0 1529229633723 10 connected 12288-13652 a749bba5614680dea9f47e3c8fe595aa8be71a2c xx.xxx.xxx.xx:6954 master - 0 1529229639230 4 connected 4096-5460 1096e2a8737401b66c7d4ee0addcb10d7ff14088 xx.xxx.xxx.xx:6952 master - 0 1529229636224 2 connected 1365-2730 fbc76f3481271241c1a89fabeb5139905e1ec2a6 xx.xxx.xxx.xx:6962 master - 0 1529229638230 12 connected 15019-16383 85601fa67820a5af0de0cc21d102d72575709ec6 xx.xxx.xxx.xx:6959 myself,master - 0 0 9 connected 10923-12287 c00d86999c98f97d697f3a2b33ba26fbf50e46eb xx.xxx.xxx.xx:6955 master - 0 1529229634724 5 connected 5461-6826 0b09a5c4c9e9158520389dd2672bd711d55085c6 xx.xxx.xxx.xx:6953 master - 0 1529229637227 3 connected 2731-4095 9f26d208fa8772449d5c322eb63786a1cf9937e0 xx.xxx.xxx.xx:6958 master - 0 1529229635224 8 connected 9557-10922 274294a88758fcb674e1a0292db0e36a66a0bf48 xx.xxx.xxx.xx:6951 master - 0 1529229634223 1 connected 0-1364 369780bdf56d483a0f0a92cb2baab786844051f3 xx.xxx.xxx.xx:6957 master - 0 1529229640232 7 connected 8192-9556 71ed0215356c664cc56d4579684e86a83dba3a92 xx.xxx.xxx.xx:6956 master - 0 1529229635724 6 connected 6827-8191

  

 

  • client list

Redis Client List 命令用于返回全部链接到服务器的客户端信息和统计数据。redis

redis 127.0.0.1:6379> CLIENT LIST 
addr=127.0.0.1:43143 fd=6 age=183 idle=0 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=32768 obl=0 oll=0 omem=0 events=r cmd=client 
addr=127.0.0.1:43163 fd=5 age=35 idle=15 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=ping
addr=127.0.0.1:43167 fd=7 age=24 idle=6 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=get

  

 

  •  cluster slots 

  Redis Client Slots 命令用于当前的集群状态算法

redis 127.0.0.1:6379> cluster slots
1) 1) (integer) 0
   2) (integer) 4095
   3) 1) "127.0.0.1"
      2) (integer) 7000
   4) 1) "127.0.0.1"
      2) (integer) 7004
2) 1) (integer) 12288
   2) (integer) 16383
   3) 1) "127.0.0.1"
      2) (integer) 7003
   4) 1) "127.0.0.1"
      2) (integer) 7007
3) 1) (integer) 4096
   2) (integer) 8191
   3) 1) "127.0.0.1"
      2) (integer) 7001
   4) 1) "127.0.0.1"
      2) (integer) 7005
4) 1) (integer) 8192
   2) (integer) 12287
   3) 1) "127.0.0.1"
      2) (integer) 7002
   4) 1) "127.0.0.1"
      2) (integer) 7006

  

  • cluster keyslot

cluster keyslot key  返回一个整数,用于标识指定键所散列到的哈希槽缓存

cluster keyslot test
(integer) 6918

  

请求重定向

因为每一个节点只负责部分slot,以及slot可能从一个节点迁移到另外一节点,形成客户端有可能会向错误的节点发起请求。所以须要有一种机制来对其进行发现和修正,这就是请求重定向。有两种不一样的重定向场景:服务器

  • MOVED

         声明的是slot全部权的转移,收到的客户端须要更新其key-node映射关系异步

  • ASK

         申明的是一种临时的状态.在从新进行分片期间,源节点向目标节点迁移一个slot过程当中,可能会出现这样一种状况:属于被迁移slot的一部分键值对保存在源节点里面,一部分保存在目标节点里面.当客户端向源节点发送一个与键有关的命令,而且这个键企刚好被迁移到目标节点,则向客户端返回一个ASK错误.由于这个节点还在处于迁移过程当中,全部权尚未转移,因此客户端在接收到ASK错误后,须要在目标节点执行命令前,先发送一个ASKING命令,若是不发放该命令到话,则会返回MOVED错误,ASKING表示已经知道迁移状态,则会执行该命令.async

 

 

经过集群查询数据key为test的值 redis-cli为单机模式;若是为集群模式时(redis-cli -c) 接收到MOVED 错误时是不会打印MOVED错误,而是根据MOVED信息自动重定向到正确节点,并打印出重定向信息ide

xx.xxx.xxx.xx:6959> get test
(error) MOVED 6918 xx.xxx.xx.xxx:6956  

  此时返回的结果表示该key在6956这个实例上,经过这个实例能够获取到缓存值

xx.xxx.xx.xxx:6956> get test
"cluster"

  经过上文的示例能够发现获取缓存值的过程须要访问cluster两次,既然key到slot值的算法是已知的,若是能够经过key直接计算slot,在经过每一个节点的管理的slot范围就能够知道这个key对应哪一个节点了,这样不就能够一次获取到了吗?其实lettuce中就是这样处理的.下文会有详细介绍

    若是mget操做值跨slot时会怎样呢? 

mget test test1
(error) CROSSSLOT Keys in request don't hash to the same slot

 

Lettuce使用

    @Bean(name="clusterRedisURI")
    RedisURI clusterRedisURI(){
        return RedisURI.builder().withHost("xx.xx.xxx.xx").withPort(6954).build();
    }
  //配置集群选项,自动重连,最多重定型1次
    @Bean
    ClusterClientOptions clusterClientOptions(){
        return ClusterClientOptions.builder().autoReconnect(true).maxRedirects(1).build();
    }

//建立集群客户端 @Bean RedisClusterClient redisClusterClient(ClientResources clientResources, ClusterClientOptions clusterClientOptions, RedisURI clusterRedisURI){ RedisClusterClient redisClusterClient= RedisClusterClient.create(clientResources,clusterRedisURI); redisClusterClient.setOptions(clusterClientOptions); return redisClusterClient; } /** * 集群链接 */ @Bean(destroyMethod = "close") StatefulRedisClusterConnection<String,String> statefulRedisClusterConnection(RedisClusterClient redisClusterClient){ return redisClusterClient.connect(); }

  Lettuce在Spring 中的使用经过上文中的配置方式进行配置后就可使用了

  1. 经过StatefulRedisClusterConnection获取命令处理方式,同步,异步以及响应式
  2. 执行redis相关命令

  

Lettuce相关源码

     lettuce的使用方式仍是很简单的那么它的处理过程究竟是怎样的呢?下面将经过源码进行解析.

经过上文能够知道链接是经过RedisClusterClient建立的,它默认使用了StringCodec(LettuceCharsets.UTF8)做为编码器建立链接

 public StatefulRedisClusterConnection<String, String> connect() {
        return connect(newStringStringCodec());
    }

  

     在建立链接时就会主动发现集群拓扑信息,在第一次建立的时候partitions必定为null则此时须要初始化分区信息

  <K, V> StatefulRedisClusterConnectionImpl<K, V> connectClusterImpl(RedisCodec<K, V> codec) {
         //若是分区信息为null则初始化分区信息
        if (partitions == null) {
            initializePartitions();
        }
        //若是须要就激活拓扑刷新
        activateTopologyRefreshIfNeeded();

 初始化集群分片信息,就是将加载分片信息赋值给partitions属性 

 protected void initializePartitions() {
        this.partitions = loadPartitions();
    }

  具体加载分片信息处理过程以下:

  protected Partitions loadPartitions() {
        //获取拓扑刷新信息,
        Iterable<RedisURI> topologyRefreshSource = getTopologyRefreshSource();

        String message = "Cannot retrieve initial cluster partitions from initial URIs " + topologyRefreshSource;
        try {
            //加载拓扑信息
            Map<RedisURI, Partitions> partitions = refresh.loadViews(topologyRefreshSource, useDynamicRefreshSources());

第一次能够知道partitions为null则此时须要初始化种子节点的,那么它的种子节点又是什么呢?经过代码能够发现种子节点就是初始化的URI,那么它又是何时设置的呢?

protected Iterable<RedisURI> getTopologyRefreshSource() {

        //是否初始化种子节点
        boolean initialSeedNodes = !useDynamicRefreshSources();

        Iterable<RedisURI> seed;
        //若是须要初始化种子节点或分区信息为null或分区信息为空 则将初始URI赋值给种子
        if (initialSeedNodes || partitions == null || partitions.isEmpty()) {
            seed = RedisClusterClient.this.initialUris;
        } else {//不须要初始化种子节点
            List<RedisURI> uris = new ArrayList<>();
            for (RedisClusterNode partition : TopologyComparators.sortByUri(partitions)) {
                uris.add(partition.getUri());
            }
            seed = uris;
        }
        return seed;
    }

  经过以下代码能够发现种子节点是在建立redisClusterClient的时候指定的

 protected RedisClusterClient(ClientResources clientResources, Iterable<RedisURI> redisURIs) {

        super(clientResources);

        assertNotEmpty(redisURIs);
        assertSameOptions(redisURIs);
        //初始化节点
        this.initialUris = Collections.unmodifiableList(LettuceLists.newList(redisURIs));
         //根据第一个URI的超时时间做为默认超时时间
        setDefaultTimeout(getFirstUri().getTimeout());
        setOptions(ClusterClientOptions.builder().build());
    }

  默认使用动态刷新

 protected boolean useDynamicRefreshSources() {

        //若是集群客户端选项不为null
        if (getClusterClientOptions() != null) {
            //获取集群拓扑刷新选项
            ClusterTopologyRefreshOptions topologyRefreshOptions = getClusterClientOptions().getTopologyRefreshOptions();
            //返回集群拓扑刷新选项中配置到是否使用动态刷新
            return topologyRefreshOptions.useDynamicRefreshSources();
        }
        //默认动态刷新
        return true;
    }

  下面看看加载分区信息的处理过程,第一次则根据种子节点的链接获取整个集群的拓扑信息

 public Map<RedisURI, Partitions> loadViews(Iterable<RedisURI> seed, boolean discovery) {

        //获取超时时间,默认60秒
        long commandTimeoutNs = getCommandTimeoutNs(seed);

        Connections connections = null;
        try {
            //获取全部种子链接
            connections = getConnections(seed).get(commandTimeoutNs, TimeUnit.NANOSECONDS);
            //Requests将异步执行命令封装到多个节点
   //cluster nodes Requests requestedTopology = connections.requestTopology();
//client list Requests requestedClients = connections.requestClients(); //获取节点拓扑视图 NodeTopologyViews nodeSpecificViews = getNodeSpecificViews(requestedTopology, requestedClients, commandTimeoutNs); if (discovery) {//是否查找额外节点 //获取集群节点 Set<RedisURI> allKnownUris = nodeSpecificViews.getClusterNodes(); //排除种子节点,获得须要发现节点 Set<RedisURI> discoveredNodes = difference(allKnownUris, toSet(seed)); //若是须要发现节点不为空 if (!discoveredNodes.isEmpty()) { //须要发现节点链接 Connections discoveredConnections = getConnections(discoveredNodes).optionalGet(commandTimeoutNs, TimeUnit.NANOSECONDS); //合并链接 connections = connections.mergeWith(discoveredConnections); //合并请求 requestedTopology = requestedTopology.mergeWith(discoveredConnections.requestTopology()); requestedClients = requestedClients.mergeWith(discoveredConnections.requestClients()); //获取节点视图 nodeSpecificViews = getNodeSpecificViews(requestedTopology, requestedClients, commandTimeoutNs); //返回uri对应分区信息 return nodeSpecificViews.toMap(); } } return nodeSpecificViews.toMap(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RedisCommandInterruptedException(e); } finally { if (connections != null) { connections.close(); } } }

  

 

     这样在建立connection的时候就已经知道集群中的全部有效节点.根据以前的文章能够知道对于集群命令的处理是在ClusterDistributionChannelWriter中处理的.其中有一些信息在初始化writer的时候就初始化了

class ClusterDistributionChannelWriter implements RedisChannelWriter {
    //默认写入器
    private final RedisChannelWriter defaultWriter;
    //集群事件监听器
    private final ClusterEventListener clusterEventListener;
    private final int executionLimit;
    //集群链接提供器
    private ClusterConnectionProvider clusterConnectionProvider;
    //异步集群链接提供器
    private AsyncClusterConnectionProvider asyncClusterConnectionProvider;
    //是否关闭
    private boolean closed = false;
    //分区信息
    private volatile Partitions partitions;

  写命令的处理以下,会根据key计算出slot,进而找到这个slot对应的node,直接访问这个node,这样能够有效减小访问cluster次数

public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {

        LettuceAssert.notNull(command, "Command must not be null");
        //若是链接已经关闭则抛出异常
        if (closed) {
            throw new RedisException("Connection is closed");
        }
        //若是是集群命令且命令没有处理完毕
        if (command instanceof ClusterCommand && !command.isDone()) {
            //类型转换, 转换为ClusterCommand
            ClusterCommand<K, V, T> clusterCommand = (ClusterCommand<K, V, T>) command;
            if (clusterCommand.isMoved() || clusterCommand.isAsk()) {

                HostAndPort target;
                boolean asking;
                //若是集群命令已经迁移,此时经过ClusterCommand中到重试操做进行到此
                if (clusterCommand.isMoved()) {
                    //获取命令迁移目标节点
                    target = getMoveTarget(clusterCommand.getError());
                    //触发迁移事件
                    clusterEventListener.onMovedRedirection();
                    asking = false;
                } else {//若是是ask
                    target = getAskTarget(clusterCommand.getError());
                    asking = true;
                    clusterEventListener.onAskRedirection();
                }

                command.getOutput().setError((String) null);
                //链接迁移后的目标节点
                CompletableFuture<StatefulRedisConnection<K, V>> connectFuture = asyncClusterConnectionProvider
                        .getConnectionAsync(ClusterConnectionProvider.Intent.WRITE, target.getHostText(), target.getPort());
                //成功创建链接,则向该节点发送命令
                if (isSuccessfullyCompleted(connectFuture)) {
                    writeCommand(command, asking, connectFuture.join(), null);
                } else {
                    connectFuture.whenComplete((connection, throwable) -> writeCommand(command, asking, connection, throwable));
                }

                return command;
            }
        }
        //不是集群命令就是RedisCommand,第一个请求命令就是非ClusterCommand
         //将当前命令包装为集群命令
        ClusterCommand<K, V, T> commandToSend = getCommandToSend(command);
        //获取命令参数
        CommandArgs<K, V> args = command.getArgs();

        //排除集群路由的cluster命令
        if (args != null && !CommandType.CLIENT.equals(commandToSend.getType())) {
            //获取第一个编码后的key
            ByteBuffer encodedKey = args.getFirstEncodedKey();
            //若是encodedKey不为null
            if (encodedKey != null) {
                //获取slot值
                int hash = getSlot(encodedKey);
                //根据命令类型获取命令意图 是读仍是写
                ClusterConnectionProvider.Intent intent = getIntent(command.getType());
                //根据意图和slot获取链接
                CompletableFuture<StatefulRedisConnection<K, V>> connectFuture = ((AsyncClusterConnectionProvider) clusterConnectionProvider)
                        .getConnectionAsync(intent, hash);
                //若是成功获取链接
                if (isSuccessfullyCompleted(connectFuture)) {
                    writeCommand(commandToSend, false, connectFuture.join(), null);
                } else {//若是链接还没有处理完,或有异常,则添加完成处理器
                    connectFuture.whenComplete((connection, throwable) -> writeCommand(commandToSend, false, connection,
                            throwable));
                }

                return commandToSend;
            }
        }

        writeCommand(commandToSend, defaultWriter);

        return commandToSend;
    }

  可是若是计算出的slot由于集群扩展致使这个slot已经不在这个节点上lettuce是如何处理的呢?经过查阅ClusterCommand源码能够发如今complete方法中对于该问题进行了处理;若是响应是MOVED则会继续访问MOVED目标节点,这个重定向的此时能够指定的,默认为5次,经过上文的配置能够发现,在配置中只容许一次重定向

 @Override
    public void complete() {
        //若是响应是MOVED或ASK
        if (isMoved() || isAsk()) {
            //若是最大重定向次数大于当前重定向次数则能够进行重定向
            boolean retryCommand = maxRedirections > redirections;
            //重定向次数自增
            redirections++;

            if (retryCommand) {
                try {
                    //重定向
                    retry.write(this);
                } catch (Exception e) {
                    completeExceptionally(e);
                }
                return;
            }
        }
        super.complete();
        completed = true;
    }

  若是是ask向重定向目标发送命令前须要同步发送asking

 private static <K, V> void writeCommand(RedisCommand<K, V, ?> command, boolean asking,
            StatefulRedisConnection<K, V> connection, Throwable throwable) {

        if (throwable != null) {
            command.completeExceptionally(throwable);
            return;
        }

        try {
            //若是须要发送asking请求,即接收到ASK错误消息,则在重定向到目标主机后须要发送asking命令
            if (asking) {
                connection.async().asking();
            }
            //发送命令
            writeCommand(command, ((RedisChannelHandler<K, V>) connection).getChannelWriter());
        } catch (Exception e) {
            command.completeExceptionally(e);
        }
    }

  

  上文主要介绍了lettuce对于单个key的处理,若是存在多个key,如mget lettuce又是如何处理的呢?其主要思路是将key根据slot进行分组,将在同一个slot的命令一块儿发送到对应的节点,再将全部请求的返回值合并做为最终结果.源码以下:

  @Override
    public RedisFuture<List<KeyValue<K, V>>> mget(Iterable<K> keys) {
        //获取分区和key的映射关系
        Map<Integer, List<K>> partitioned = SlotHash.partition(codec, keys);
        //若是分区数小于2也就是只有一个分区即全部key都落在一个分区就直接获取
        if (partitioned.size() < 2) {
            return super.mget(keys);
        }
        //每一个key与slot映射关系
        Map<K, Integer> slots = SlotHash.getSlots(partitioned);

        Map<Integer, RedisFuture<List<KeyValue<K, V>>>> executions = new HashMap<>();
        //遍历分片信息,逐个发送
        for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
            RedisFuture<List<KeyValue<K, V>>> mget = super.mget(entry.getValue());
            executions.put(entry.getKey(), mget);
        }

        //恢复key的顺序
        return new PipelinedRedisFuture<>(executions, objectPipelinedRedisFuture -> {
            List<KeyValue<K, V>> result = new ArrayList<>();
            for (K opKey : keys) {
                int slot = slots.get(opKey);

                int position = partitioned.get(slot).indexOf(opKey);
                RedisFuture<List<KeyValue<K, V>>> listRedisFuture = executions.get(slot);
                result.add(MultiNodeExecution.execute(() -> listRedisFuture.get().get(position)));
            }

            return result;
        });
    }
相关文章
相关标签/搜索