简单介绍 redis pipeline 的机制,结合一段实例说明pipeline 在提高吞吐量方面发生的效用。redis
应用系统在数据推送或事件处理过程当中,每每出现数据流通过多个网元;
然而在某些服务中,数据操做对redis 是强依赖的,在最近的一次分析中发现:
一次数据推送会对 redis 产生近30次读写操做!服务器
在数据推送业务中的性能压测中,以数据上报 -> 下发应答为一次事务;
而对于这样的读写模型,redis 的操做过于频繁,很快便致使系统延时太高,吞吐量低下,没法知足目标;并发
优化过程 主要针对业务代码作的优化,其中redis 操做通过大量合并,最终下降到原来的1/5,而系统吞吐量也提高明显。
其中,redis pipeline(管道机制) 的应用是一个关键手段。dom
Pipeline指的是管道技术,指的是客户端容许将多个请求依次发给服务器,过程当中而不须要等待请求的回复,在最后再一并读取结果便可。
管道技术使用普遍,例如许多POP3协议已经实现支持这个功能,大大加快了从服务器下载新邮件的过程。
Redis很早就支持管道(pipeline)技术。(所以不管你运行的是什么版本,你均可以使用管道(pipelining)操做Redis)性能
普通请求模型
[图-pipeline1]优化
Pipeline请求模型
[图-pipeline2]线程
从两个图的对比中可看出,普通的请求模型是同步的,每次请求对应一次IO操做等待;
而Pipeline 化以后全部的请求合并为一次IO,除了时延能够下降以外,还能大幅度提高系统吞吐量。code
说明
本地开启50个线程,每一个线程完成1000个key的写入,对比pipeline开启及不开启两种场景下的性能表现。blog
相关常量事件
// 并发任务 private static final int taskCount = 50; // pipeline大小 private static final int batchSize = 10; // 每一个任务处理命令数 private static final int cmdCount = 1000; private static final boolean usePipeline = true;
初始化链接
JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMaxActive(200); poolConfig.setMaxIdle(100); poolConfig.setMaxWait(2000); poolConfig.setTestOnBorrow(false); poolConfig.setTestOnReturn(false); jedisPool = new JedisPool(poolConfig, host, port);
并发启动任务,统计执行时间
public static void main(String[] args) throws InterruptedException { init(); flushDB(); long t1 = System.currentTimeMillis(); ExecutorService executor = Executors.newCachedThreadPool(); CountDownLatch latch = new CountDownLatch(taskCount); for (int i = 0; i < taskCount; i++) { executor.submit(new DemoTask(i, latch)); } latch.await(); executor.shutdownNow(); long t2 = System.currentTimeMillis(); System.out.println("execution finish time(s):" + (t2 - t1) / 1000.0); }
DemoTask 封装了执行key写入的细节,区分不一样场景
public void run() { logger.info("Task[{}] start.", id); try { if (usePipeline) { runWithPipeline(); } else { runWithNonPipeline(); } } finally { latch.countDown(); } logger.info("Task[{}] end.", id); }
不使用Pipeline的场景比较简单,循环执行set操做
for (int i = 0; i < cmdCount; i++) { Jedis jedis = get(); try { jedis.set(key(i), UUID.randomUUID().toString()); } finally { if (jedis != null) { jedisPool.returnResource(jedis); } } if (i % batchSize == 0) { logger.info("Task[{}] process -- {}", id, i); } }
使用Pipeline,须要处理分段,如10个做为一批命令执行
for (int i = 0; i < cmdCount;) { Jedis jedis = get(); try { Pipeline pipeline = jedis.pipelined(); int j; for (j = 0; j < batchSize; j++) { if (i + j < cmdCount) { pipeline.set(key(i + j), UUID.randomUUID().toString()); } else { break; } } pipeline.sync(); logger.info("Task[{}] pipeline -- {}", id, i + j); i += j; } finally { if (jedis != null) { jedisPool.returnResource(jedis); } } }
不使用Pipeline,总体执行26s;而使用Pipeline优化后的代码,执行时间仅须要3s!
NoPipeline-stat
[图-nopipeline]
Pipeline-stat
[图-pipeline]