答案是确定的,节约的时间是客户端client和服务器redis server之间往返网络延迟的时间。这个时间能够用ping命令查看。html
网络延迟高:批量执行,性能提高明显java
网络延迟低(本机):批量执行,性能提高不明显python
某些客户端(java和python)提供了一种叫作pipeline的编程模式用来解决批量提交请求的方式。redis
这里咱们用python客户端来举例说明一下。编程
一、pipeline服务器
网络延迟网络
client与server机器之间网络延迟以下,大约是30ms。函数
测试用例post
分别执行其中的try_pipeline和without_pipeline统计处理时间。 性能
# -*- coding:utf-8 -*- import redis import time from concurrent.futures import ProcessPoolExecutor r = redis.Redis(host='10.93.84.53', port=6379, password='bigdata123') def try_pipeline(): start = time.time() with r.pipeline(transaction=False) as p: p.sadd('seta', 1).sadd('seta', 2).srem('seta', 2).lpush('lista', 1).lrange('lista', 0, -1) p.execute() print time.time() - start def without_pipeline(): start = time.time() r.sadd('seta', 1) r.sadd('seta', 2) r.srem('seta', 2) r.lpush('lista', 1) r.lrange('lista', 0, -1) print time.time() - start def worker(): while True: try_pipeline() with ProcessPoolExecutor(max_workers=12) as pool: for _ in range(10): pool.submit(worker)
结果分析
try_pipeline平均处理时间:0.04659
without_pipeline平均处理时间:0.16672
咱们的批量里有5个操做,在处理时间维度上性能提高了4倍!
网络延迟大约是30ms,不使用批量的状况下,网络上的时间损耗就有0.15s(30ms*5)以上。而pipeline批量操做只进行一次网络往返,因此延迟只有0.03s。能够看到节省的时间基本都是网路延迟。
二、pipeline与transation
pipeline不单单用来批量的提交命令,还用来实现事务transation。
这里对redis事务的讨论不会太多,只是给出一个demo。详细的描述你能够参见这篇博客。redis事务
细心的你可能发现了,使用transaction与否不一样之处在与建立pipeline实例的时候,transaction是否打开,默认是打开的。
# -*- coding:utf-8 -*- import redis from redis import WatchError from concurrent.futures import ProcessPoolExecutor r = redis.Redis(host='127.0.0.1', port=6379) # 减库存函数, 循环直到减库存完成 # 库存充足, 减库存成功, 返回True # 库存不足, 减库存失败, 返回False def decr_stock(): # python中redis事务是经过pipeline的封装实现的 with r.pipeline() as pipe: while True: try: # watch库存键, multi后若是该key被其余客户端改变, 事务操做会抛出WatchError异常 pipe.watch('stock:count') count = int(pipe.get('stock:count')) if count > 0: # 有库存 # 事务开始 pipe.multi() pipe.decr('stock:count') # 把命令推送过去 # execute返回命令执行结果列表, 这里只有一个decr返回当前值 print pipe.execute()[0] return True else: return False except WatchError, ex: # 打印WatchError异常, 观察被watch锁住的状况 print ex pipe.unwatch() def worker(): while True: # 没有库存就退出 if not decr_stock(): break # 实验开始 # 设置库存为100 r.set("stock:count", 100) # 多进程模拟多个客户端提交 with ProcessPoolExecutor(max_workers=2) as pool: for _ in range(10): pool.submit(worker)