一种简单实现Redis集群Pipeline功能的方法及性能测试

上一篇文章《redis pipeline批量处理提升性能》中咱们讲到redis pipeline模式在批量数据处理上带来了很大的性能提高,咱们先来回顾一下pipeline的原理,redis client与server之间采用的是请求应答的模式,以下所示:html

Client: command1 
Server: response1 
Client: command2 
Server: response2 
…

在这种状况下,若是要完成10个命令,则须要20次交互才能完成。所以,即便redis处理能力很强,仍然会受到网络传输影响,致使吞吐量上不去。而在管道(pipeline)模式下,多个请求能够变成这样:java

Client: command1,command2… 
Server: response1,response2…

在这种状况下,完成命令只须要2次交互。这样网络传输上可以更加高效,加上redis自己强劲的处理能力,给数据处理带来极大的性能提高。但实际上遇到的问题是,项目上所用到的是Redis集群,初始化的时候使用的类是JedisCluster而不是Jedis。去查了JedisCluster的文档,并无发现提供有像Jedis同样的获取Pipeline对象的 pipelined()方法。node

为何RedisCluster没法使用pipeline?

咱们知道,Redis 集群的键空间被分割为 16384 个槽(slot),集群的最大节点数量也是 16384 个。每一个主节点都负责处理 16384 个哈希槽的其中一部分。具体的redis命令,会根据key计算出一个槽位(slot),而后根据槽位去特定的节点redis上执行操做。以下所示:redis

master1(slave1): 0~5460
master2(slave2):5461~10922
master3(slave3):10923~16383

集群有三个master节点组成,其中master1分配了 0~5460的槽位,master2分配了 5461~10922的槽位,master3分配了 10923~16383的槽位。算法

一次pipeline会批量执行多个命令,那么每一个命令都须要根据“key”运算一个槽位(JedisClusterCRC16.getSlot(key)),而后根据槽位去特定的机器执行命令,也就是说一次pipeline操做会使用多个节点的redis链接,而目前JedisCluster是没法支持的。shell

如何基于JedisCluster扩展pipeline?

设计思路

1.首先要根据key计算出这次pipeline会使用到的节点对应的链接(也就是jedis对象,一般每一个节点对应一个Pool)。
2.相同槽位的key,使用同一个jedis.pipeline去执行命令。
3.合并这次pipeline全部的response返回。
4.链接释放返回到池中。apache

也就是将一个JedisCluster下的pipeline分解为每一个单节点下独立的jedisPipeline操做,最后合并response返回。具体实现就是经过JedisClusterCRC16.getSlot(key)计算key的slot值,经过每一个节点的slot分布,就知道了哪些key应该在哪些节点上。再获取这个节点的JedisPool就可使用pipeline进行读写了。
实现上面的过程能够有不少种方式,本文将介绍一种也许是代码量最少的一种解决方案。网络

解决方案

上面提到的过程,其实在JedisClusterInfoCache对象中都已经帮助开发人员实现了,可是这个对象在JedisClusterConnectionHandler中为protected并无对外开放,并且经过JedisCluster的API也没法拿到JedisClusterConnectionHandler对象。因此经过下面两个类将这些对象暴露出来,这样使用getJedisPoolFromSlot就能够知道每一个key对应的JedisPool了。性能

Maven依赖测试

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.9.0</version>
</dependency>

JedisClusterPipeline

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;

import java.util.Set;

public class JedisClusterPipeline extends JedisCluster {
    public JedisClusterPipeline(Set<HostAndPort> jedisClusterNode, int connectionTimeout, int soTimeout, int maxAttempts, String password, final GenericObjectPoolConfig poolConfig) {
        super(jedisClusterNode,connectionTimeout, soTimeout, maxAttempts, password, poolConfig);
        super.connectionHandler = new JedisSlotAdvancedConnectionHandler(jedisClusterNode, poolConfig,
                connectionTimeout, soTimeout ,password);
    }

    public JedisSlotAdvancedConnectionHandler getConnectionHandler() {
        return (JedisSlotAdvancedConnectionHandler)this.connectionHandler;
    }

    /**
     * 刷新集群信息,当集群信息发生变动时调用
     * @param
     * @return
     */
    public void refreshCluster() {
        connectionHandler.renewSlotCache();
    }
}

JedisSlotAdvancedConnectionHandler

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisSlotBasedConnectionHandler;
import redis.clients.jedis.exceptions.JedisNoReachableClusterNodeException;

import java.util.Set;

public class JedisSlotAdvancedConnectionHandler extends JedisSlotBasedConnectionHandler {

    public JedisSlotAdvancedConnectionHandler(Set<HostAndPort> nodes, GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout,String password) {
        super(nodes, poolConfig, connectionTimeout, soTimeout, password);
    }

    public JedisPool getJedisPoolFromSlot(int slot) {
        JedisPool connectionPool = cache.getSlotPool(slot);
        if (connectionPool != null) {
            // It can't guaranteed to get valid connection because of node
            // assignment
            return connectionPool;
        } else {
            renewSlotCache(); //It's abnormal situation for cluster mode, that we have just nothing for slot, try to rediscover state
            connectionPool = cache.getSlotPool(slot);
            if (connectionPool != null) {
                return connectionPool;
            } else {
                throw new JedisNoReachableClusterNodeException("No reachable node in cluster for slot " + slot);
            }
        }
    }
}

编写测试类,向redis集群写入10000条数据,分别测试调用普通JedisCluster模式和调用上面实现的JedisCluster Pipeline模式的性能对比,测试类以下:

import redis.clients.jedis.*;
import redis.clients.util.JedisClusterCRC16;
import java.io.UnsupportedEncodingException;
import java.util.*;

public class PipelineTest {
    public static void main(String[] args) throws UnsupportedEncodingException {
        PipelineTest client = new PipelineTest();
        Set<HostAndPort> nodes = new HashSet<>();
        nodes.add(new HostAndPort("node1",20249));
        nodes.add(new HostAndPort("node2",20508));
        nodes.add(new HostAndPort("node3",20484));
        String redisPassword = "123456";
        //测试
        client.jedisCluster(nodes,redisPassword);
        client.clusterPipeline(nodes,redisPassword);
    }
    //普通JedisCluster 批量写入测试
    public void jedisCluster(Set<HostAndPort> nodes,String redisPassword) throws UnsupportedEncodingException {
        JedisCluster jc = new JedisCluster(nodes, 2000, 2000,100,redisPassword, new JedisPoolConfig());
        List<String> setKyes = new ArrayList<>();
        for (int i = 0; i < 10000; i++) {
            setKyes.add("single"+i);
        }
        long start = System.currentTimeMillis();
        for(int j = 0;j < setKyes.size();j++){
            jc.setex(setKyes.get(j),100,"value"+j);
        }
        System.out.println("JedisCluster total time:"+(System.currentTimeMillis() - start));
    }
    //JedisCluster Pipeline 批量写入测试
    public void clusterPipeline(Set<HostAndPort> nodes,String redisPassword) {
        JedisClusterPipeline jedisClusterPipeline = new JedisClusterPipeline(nodes, 2000, 2000,10,redisPassword, new JedisPoolConfig());
        JedisSlotAdvancedConnectionHandler jedisSlotAdvancedConnectionHandler = jedisClusterPipeline.getConnectionHandler();
        Map<JedisPool, List<String>> poolKeys = new HashMap<>();
        List<String> setKyes = new ArrayList<>();
        for (int i = 0; i < 10000; i++) {
                setKyes.add("pipeline"+i);
        }
        long start = System.currentTimeMillis();
        //查询出 key 所在slot ,经过 slot 获取 JedisPool ,将key 按 JedisPool 分组
        jedisClusterPipeline.refreshCluster();
        for(int j = 0;j < setKyes.size();j++){
            String key = setKyes.get(j);
            int slot = JedisClusterCRC16.getSlot(key);
            JedisPool jedisPool = jedisSlotAdvancedConnectionHandler.getJedisPoolFromSlot(slot);
            if (poolKeys.keySet().contains(jedisPool)){
                List<String> keys = poolKeys.get(jedisPool);
                keys.add(key);
            }else {
                List<String> keys = new ArrayList<>();
                keys.add(key);
                poolKeys.put(jedisPool, keys);
            }
        }
        //调用Jedis pipeline进行单点批量写入
        for (JedisPool jedisPool : poolKeys.keySet()) {
            Jedis jedis = jedisPool.getResource();
            Pipeline pipeline = jedis.pipelined();
            List<String> keys = poolKeys.get(jedisPool);
            for(int i=0;i<keys.size();i++){
                pipeline.setex(keys.get(i),100, "value" + i);
            }
            pipeline.sync();//同步提交
            jedis.close();
        }
        System.out.println("JedisCluster Pipeline total time:"+(System.currentTimeMillis() - start));
    }
}

测试结果以下:

JedisCluster total time:29147
JedisCluster Pipeline total time:190

结论:对于批量操做,JedisCluster Pipeline有明显的性能提高。

总结

本文旨在介绍一种在Redis集群模式下提供Pipeline批量操做的功能。基本思路就是根据redis cluster对数据哈希取模的算法,先计算数据存放的slot位置, 而后根据不一样的节点将数据分红多批,对不一样批的数据进行单点pipeline处理。 可是须要注意的是,因为集群模式存在节点的动态添加删除,且client不能实时感知(只有在执行命令时才可能知道集群发生变动),所以,该实现不保证必定成功,建议在批量操做以前调用 refreshCluster() 方法从新获取集群信息。应用须要保证不论成功仍是失败都会调用close() 方法,不然可能会形成泄露。若是失败须要应用本身去重试,所以每一个批次执行的命令数量须要控制,防止失败后重试的数量过多。 基于以上说明,建议在集群环境较稳定(增减节点不会过于频繁)的状况下使用,且容许失败或有对应的重试策略。

相关文章
相关标签/搜索