摘要: 引言 了解Jedis的童鞋可能清楚,Jedis中JedisCluster是不支持pipeline操做的,若是使用了redis集群,在spring-boot-starter-data-redis中又正好用到的pipeline,那么会接收到Pipeline is currently not supported for JedisClusterConnection.这样的报错。html
了解Jedis的童鞋可能清楚,Jedis中JedisCluster
是不支持pipeline操做的,若是使用了redis集群,在spring-boot-starter-data-redis
中又正好用到的pipeline,那么会接收到Pipeline is currently not supported for JedisClusterConnection.
这样的报错。错误来自于org.springframework.data.redis.connection.jedis.JedisClusterConnection
:git
/* * (non-Javadoc) * @see org.springframework.data.redis.connection.RedisConnection#openPipeline() */ @Override public void openPipeline() { throw new UnsupportedOperationException("Pipeline is currently not supported for JedisClusterConnection."); }
org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration
会帮咱们自动配置,不管你redis使用的是standalone、sentinel、cluster配置。这个源码很容易理解,读者可自行阅读,不理解的能够一块儿讨论。github
spring boot 2.0开始,配置spring-boot-starter-data-redis
将不依赖Jedis,而是依赖Lettuce,在Lettuce中,redis cluster使用pipeline不会有问题。redis
再往下看可能须要读者具有以下的能力:spring
redis cluster一共有16384个桶(hash slot),用来装数据,创建集群的时候每一个集群节点会负责一些slot的数据存储,好比我负责0-1000,你负责1001-2000,他负责2001-3000……
数据存储时,每一个key在存入redis cluster前,会利用CRC16计算出一个值,这个值就是对应redis cluster的hash slot,就知道这个key会被放到哪一个服务器上了。服务器
JedisCluster本质上是使用Jedis来和redis集群进行打交道的,具体过程是:spring-boot
JedisClusterCRC16.getSlot(key)
JedisClusterConnectionHandler
实例中获取到该slot对应的Jedis
实例:Jedis connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
redis提供了mset,hmset之类的命令,或者说集合操做可使用sadd key 1 2 3 4 5 6 ..... 10000000000这种一口气传一堆数据的命令。
有时候你甚至会发现*mset这种一口气操做一堆数据的速度更快。那么这种使用场景会有什么弊端呢?答案是:阻塞。
操做这一堆数据须要多久,就会阻塞多久。spa
因为JedisCluster中的全部操做本质上是使用Jedis,而Jedis是支持pipeline操做的,全部,要在redis cluster中使用pipeline是有可能的,只要你操做同一个键便可,准确的说,应该是你操做的键位于同一台服务器,更直白的,你操做的键是同一个Jedis实例。ok,若是你已经晕了,那你须要回看一下“知识储备”。
说说笔者的使用场景吧,咱们是把csv文件的一批数据读到内存中,同一批数据是存储到同一个key中的,最后的操做会相似于:code
set key member1 set key member2 set key member3 ... set key member100000
操做的是同一个key,能够利用JedisCluster获取到该key的Jedis实例,而后利用pipeline操做。
提供一下代码思路。
RedisConnectionFactory factory = redisTemplate.getConnectionFactory(); RedisConnection redisConnection = factory.getConnection(); JedisClusterConnection jedisClusterConnection = (JedisClusterConnection) redisConnection; // 获取到原始到JedisCluster链接 JedisCluster jedisCluster = jedisClusterConnection.getNativeConnection(); // 经过key获取到具体的Jedis实例 // 计算hash slot,根据特定的slot能够获取到特定的Jedis实例 int slot = JedisClusterCRC16.getSlot(key); /** * 不建议这么使用,官方在2.10版本已经修复<a href="https://github.com/xetorthio/jedis/pull/1532">此问题</a><br> * 2.10版本中,官方会直接提供JedisCluster#getConnectionFromSlot */ Field field = ReflectionUtils.findField(BinaryJedisCluster.class, null, JedisClusterConnectionHandler.class); field.setAccessible(true); JedisSlotBasedConnectionHandler jedisClusterConnectionHandler = (JedisSlotBasedConnectionHandler) field.get(jedisCluster); Jedis jedis = jedisClusterConnectionHandler.getConnectionFromSlot(slot); // 接下来就是pipeline操做了 Pipeline pipeline = jedis.pipelined(); ... pipeline.syncAndReturnAll();
以上代码彻底能够模仿spring-data-redis中RedisTemplate#executePipelined
方法写成一个通用的方法,供使用者调用。