上一篇文章《redis pipeline批量处理提升性能》中咱们讲到redis pipeline模式在批量数据处理上带来了很大的性能提高,咱们先来回顾一下pipeline的原理,redis client与server之间采用的是请求应答的模式,以下所示:html
Client: command1 Server: response1 Client: command2 Server: response2 …
在这种状况下,若是要完成10个命令,则须要20次交互才能完成。所以,即便redis处理能力很强,仍然会受到网络传输影响,致使吞吐量上不去。而在管道(pipeline)模式下,多个请求能够变成这样:java
Client: command1,command2… Server: response1,response2…
在这种状况下,完成命令只须要2次交互。这样网络传输上可以更加高效,加上redis自己强劲的处理能力,给数据处理带来极大的性能提高。但实际上遇到的问题是,项目上所用到的是Redis集群,初始化的时候使用的类是JedisCluster而不是Jedis。去查了JedisCluster的文档,并无发现提供有像Jedis同样的获取Pipeline对象的 pipelined()方法。node
咱们知道,Redis 集群的键空间被分割为 16384 个槽(slot),集群的最大节点数量也是 16384 个。每一个主节点都负责处理 16384 个哈希槽的其中一部分。具体的redis命令,会根据key计算出一个槽位(slot),而后根据槽位去特定的节点redis上执行操做。以下所示:redis
master1(slave1): 0~5460 master2(slave2):5461~10922 master3(slave3):10923~16383
集群有三个master节点组成,其中master1分配了 0~5460的槽位,master2分配了 5461~10922的槽位,master3分配了 10923~16383的槽位。算法
一次pipeline会批量执行多个命令,那么每一个命令都须要根据“key”运算一个槽位(JedisClusterCRC16.getSlot(key)),而后根据槽位去特定的机器执行命令,也就是说一次pipeline操做会使用多个节点的redis链接,而目前JedisCluster是没法支持的。shell
1.首先要根据key计算出这次pipeline会使用到的节点对应的链接(也就是jedis对象,一般每一个节点对应一个Pool)。
2.相同槽位的key,使用同一个jedis.pipeline去执行命令。
3.合并这次pipeline全部的response返回。
4.链接释放返回到池中。apache
也就是将一个JedisCluster下的pipeline分解为每一个单节点下独立的jedisPipeline操做,最后合并response返回。具体实现就是经过JedisClusterCRC16.getSlot(key)计算key的slot值,经过每一个节点的slot分布,就知道了哪些key应该在哪些节点上。再获取这个节点的JedisPool就可使用pipeline进行读写了。
实现上面的过程能够有不少种方式,本文将介绍一种也许是代码量最少的一种解决方案。网络
上面提到的过程,其实在JedisClusterInfoCache对象中都已经帮助开发人员实现了,可是这个对象在JedisClusterConnectionHandler中为protected并无对外开放,并且经过JedisCluster的API也没法拿到JedisClusterConnectionHandler对象。因此经过下面两个类将这些对象暴露出来,这样使用getJedisPoolFromSlot就能够知道每一个key对应的JedisPool了。性能
Maven依赖测试
<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency>
JedisClusterPipeline
import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import redis.clients.jedis.HostAndPort; import redis.clients.jedis.JedisCluster; import java.util.Set; public class JedisClusterPipeline extends JedisCluster { public JedisClusterPipeline(Set<HostAndPort> jedisClusterNode, int connectionTimeout, int soTimeout, int maxAttempts, String password, final GenericObjectPoolConfig poolConfig) { super(jedisClusterNode,connectionTimeout, soTimeout, maxAttempts, password, poolConfig); super.connectionHandler = new JedisSlotAdvancedConnectionHandler(jedisClusterNode, poolConfig, connectionTimeout, soTimeout ,password); } public JedisSlotAdvancedConnectionHandler getConnectionHandler() { return (JedisSlotAdvancedConnectionHandler)this.connectionHandler; } /** * 刷新集群信息,当集群信息发生变动时调用 * @param * @return */ public void refreshCluster() { connectionHandler.renewSlotCache(); } }
JedisSlotAdvancedConnectionHandler
import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import redis.clients.jedis.HostAndPort; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisSlotBasedConnectionHandler; import redis.clients.jedis.exceptions.JedisNoReachableClusterNodeException; import java.util.Set; public class JedisSlotAdvancedConnectionHandler extends JedisSlotBasedConnectionHandler { public JedisSlotAdvancedConnectionHandler(Set<HostAndPort> nodes, GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout,String password) { super(nodes, poolConfig, connectionTimeout, soTimeout, password); } public JedisPool getJedisPoolFromSlot(int slot) { JedisPool connectionPool = cache.getSlotPool(slot); if (connectionPool != null) { // It can't guaranteed to get valid connection because of node // assignment return connectionPool; } else { renewSlotCache(); //It's abnormal situation for cluster mode, that we have just nothing for slot, try to rediscover state connectionPool = cache.getSlotPool(slot); if (connectionPool != null) { return connectionPool; } else { throw new JedisNoReachableClusterNodeException("No reachable node in cluster for slot " + slot); } } } }
编写测试类,向redis集群写入10000条数据,分别测试调用普通JedisCluster模式和调用上面实现的JedisCluster Pipeline模式的性能对比,测试类以下:
import redis.clients.jedis.*; import redis.clients.util.JedisClusterCRC16; import java.io.UnsupportedEncodingException; import java.util.*; public class PipelineTest { public static void main(String[] args) throws UnsupportedEncodingException { PipelineTest client = new PipelineTest(); Set<HostAndPort> nodes = new HashSet<>(); nodes.add(new HostAndPort("node1",20249)); nodes.add(new HostAndPort("node2",20508)); nodes.add(new HostAndPort("node3",20484)); String redisPassword = "123456"; //测试 client.jedisCluster(nodes,redisPassword); client.clusterPipeline(nodes,redisPassword); } //普通JedisCluster 批量写入测试 public void jedisCluster(Set<HostAndPort> nodes,String redisPassword) throws UnsupportedEncodingException { JedisCluster jc = new JedisCluster(nodes, 2000, 2000,100,redisPassword, new JedisPoolConfig()); List<String> setKyes = new ArrayList<>(); for (int i = 0; i < 10000; i++) { setKyes.add("single"+i); } long start = System.currentTimeMillis(); for(int j = 0;j < setKyes.size();j++){ jc.setex(setKyes.get(j),100,"value"+j); } System.out.println("JedisCluster total time:"+(System.currentTimeMillis() - start)); } //JedisCluster Pipeline 批量写入测试 public void clusterPipeline(Set<HostAndPort> nodes,String redisPassword) { JedisClusterPipeline jedisClusterPipeline = new JedisClusterPipeline(nodes, 2000, 2000,10,redisPassword, new JedisPoolConfig()); JedisSlotAdvancedConnectionHandler jedisSlotAdvancedConnectionHandler = jedisClusterPipeline.getConnectionHandler(); Map<JedisPool, List<String>> poolKeys = new HashMap<>(); List<String> setKyes = new ArrayList<>(); for (int i = 0; i < 10000; i++) { setKyes.add("pipeline"+i); } long start = System.currentTimeMillis(); //查询出 key 所在slot ,经过 slot 获取 JedisPool ,将key 按 JedisPool 分组 jedisClusterPipeline.refreshCluster(); for(int j = 0;j < setKyes.size();j++){ String key = setKyes.get(j); int slot = JedisClusterCRC16.getSlot(key); JedisPool jedisPool = jedisSlotAdvancedConnectionHandler.getJedisPoolFromSlot(slot); if (poolKeys.keySet().contains(jedisPool)){ List<String> keys = poolKeys.get(jedisPool); keys.add(key); }else { List<String> keys = new ArrayList<>(); keys.add(key); poolKeys.put(jedisPool, keys); } } //调用Jedis pipeline进行单点批量写入 for (JedisPool jedisPool : poolKeys.keySet()) { Jedis jedis = jedisPool.getResource(); Pipeline pipeline = jedis.pipelined(); List<String> keys = poolKeys.get(jedisPool); for(int i=0;i<keys.size();i++){ pipeline.setex(keys.get(i),100, "value" + i); } pipeline.sync();//同步提交 jedis.close(); } System.out.println("JedisCluster Pipeline total time:"+(System.currentTimeMillis() - start)); } }
测试结果以下:
JedisCluster total time:29147 JedisCluster Pipeline total time:190
结论:对于批量操做,JedisCluster Pipeline有明显的性能提高。
本文旨在介绍一种在Redis集群模式下提供Pipeline批量操做的功能。基本思路就是根据redis cluster对数据哈希取模的算法,先计算数据存放的slot位置, 而后根据不一样的节点将数据分红多批,对不一样批的数据进行单点pipeline处理。 可是须要注意的是,因为集群模式存在节点的动态添加删除,且client不能实时感知(只有在执行命令时才可能知道集群发生变动),所以,该实现不保证必定成功,建议在批量操做以前调用 refreshCluster() 方法从新获取集群信息。应用须要保证不论成功仍是失败都会调用close() 方法,不然可能会形成泄露。若是失败须要应用本身去重试,所以每一个批次执行的命令数量须要控制,防止失败后重试的数量过多。 基于以上说明,建议在集群环境较稳定(增减节点不会过于频繁)的状况下使用,且容许失败或有对应的重试策略。