目录php
参考: http://redis.cn/topics/transactions.htmlhtml
事务是一个单独的隔离操做:事务中的全部命令都会序列化、按顺序地执行。事务在执行的过程当中,不会被其余客户端发送来的命令请求所打断。git
事务是一个原子操做:事务中的命令要么所有被执行,要么所有都不执行。github
redis事务是一组命令的集合。多组命令进入到等待执行的事务队列中,执行exec命令告诉redis将等待执行的事务队列中的全部命令,按顺序执行,返回值就是这些命令组成的列表。web
Redis 事务能够一次执行多个命令, 具备下列保证:redis
一个事务从开始到执行会经历如下三个阶段:算法
事务中的错误:数据库
从 Redis 2.6.5 开始,服务器会对命令入队失败的状况进行记录,并在客户端调用 EXEC 命令时,拒绝执行并自动放弃这个事务缓存
在 EXEC 命令执行以后所产生的错误, 并无对它们进行特别处理: 即便事务中有某个/某些命令在执行时产生了错误, 事务中的其余命令仍然会继续执行安全
如:
127.0.0.1:6379> multi OK 127.0.0.1:6379> set a 3 QUEUED 127.0.0.1:6379> lpop a QUEUED 127.0.0.1:6379> exec 1) OK 2) (error) WRONGTYPE Operation against a key holding the wrong kind of value
redis 事务入队只会检查语法错误,对于exec后执行错误,没有回滚措施。并且在事务中没法在客户端作查询判断,只会获得queued,没法进行业务数据判断,也是很坑。
原子性
一个事务是一个不可分割的最小工做单位,要么都成功要么都失败。
原子操做是指你的一个业务逻辑必须是不可拆分的.好比你给别人转钱,你的帐号扣钱,别人的帐号增长钱。
单个 Redis 命令的执行是原子性的,但 Redis 没有在事务上增长任何维持原子性的机制,因此 Redis 事务的执行并非原子性的。
包含5个命令 MULTI、EXEC、DISCARD、WATCH、UNWATCH。
DISCARD 取消事务,放弃执行事务块内的全部命令。
EXEC 执行全部事务块内的命令。
MULTI 标记一个事务块的开始。
UNWATCH 取消 WATCH 命令对全部 key 的监视。
WATCH key [key ...] 监视一个(或多个) key ,若是在事务执行以前这个(或这些) key 被其余命令所改动,那么事务将被打断。
乐观的认为数据不会出现冲突,使用version或timestamp来记录判断。乐观锁的优势开销小,不会出现锁冲突。
可利用watch命令监听key,实现乐观锁,来保证不会出现冲突,应用场景好比秒杀来防止超卖。
秒杀伪代码以下:
WATCH 锁定量 MULTI incr 锁定量 if 锁定量 <= 库存量 减库存 EXEC
了解下相关命令
了解下抢购模拟代码:
<?php namespace app\controllers; use Yii; use yii\web\Controller; use app\modules\Common; /** * 模拟抢购处理 * Class ShopController * @package app\controllers */ class ShopController extends Controller { public $goods = 'huawei P20'; //初始化数据 public function actionInit(){ $redis = Yii::$app->redis; $redis->set('goodNums',100); //设置库存 $redis->del('order'); //清空抢购订单 die('success'); } //悲观锁 //setnx 实现,有个问题 expire失败(1.人为错误;2.redis崩了)了,这个锁就持久化,一直被锁了 public function actionBuy(){ $userId = mt_rand(1,99999999); $goods = $this->goods; $redis = Yii::$app->redis; $lock = $goods; try { $inventory['num'] = $redis->get('goodNums'); if($inventory['num']<=0){ self::removeLock($lock); throw new \Exception('活动结束'); } if( $redis->setnx($lock,1) ){ $redis->expire($lock,60);//设置过时时间,防止死锁 //业务处理 减库存,建立订单 $redis->decr('goodNums'); $redis->sadd('order',$userId); //todo 实际业务处理时间不可控,因此须要调整过时时间,在业务处理完进行剩余生命时间的判断,没找到回滚业务 $this->removeLock($lock); }else{ throw new \Exception($userId.' 抢购失败'); } Common::addLog('shop.log',$userId.' 抢购成功'); }catch (\Exception $e){ $this->removeLock($lock); Common::addLog('shop.log',$e->getMessage()); } die('success'); } //删除锁 protected function removeLock( $lock ){ $redis = Yii::$app->redis; return $redis->del($lock); } //悲观锁 //incr 解决expire失效,解锁 public function actionBuy2(){ $userId = mt_rand(1,99999999); $goods = $this->goods; $redis = Yii::$app->redis; $lock = $goods; try { $inventory['num'] = $redis->get('goodNums'); if($inventory['num']<=0){ $this->removeLock($lock); throw new \Exception('活动结束'); } $lockset = $redis->incr($lock); if( !$lockset ){ throw new \Exception($userId.' 抢购失败'); } if($lockset==1){ $redis->expire($lock,60);//设置过时时间,防止死锁 //业务处理 减库存,建立订单 $redis->decr('goodNums'); $redis->sadd('order',$userId); $this->removeLock($lock); } //锁的数量大于1而且没有设置过时时间,失败处理 if( $lockset>1 && $redis->ttl($lock)===-1 ){ $this->removeLock($lock); throw new \Exception($userId.' 抢购失败'); } Common::addLog('shop.log',$userId.' 抢购成功'); }catch (\Exception $e){ $this->removeLock($lock); Common::addLog('shop.log',$e->getMessage()); } die('success'); } //悲观锁 //set key value [expiration EX seconds|PX milliseconds] [NX|XX] 原子命令(redis必须大于2.6版本) public function actionBuy3(){ $userId = mt_rand(1,99999999); $goods = $this->goods; $redis = Yii::$app->redis; $lock = $goods; try { $inventory['num'] = $redis->get('goodNums'); if($inventory['num']<=0){ $this->removeLock($lock); throw new \Exception('活动结束'); } $lockset = $redis->set($lock,1,'EX',60,'NX'); if( !$lockset ){ throw new \Exception($userId.' 抢购失败'); } if($lockset==1){ //业务处理 减库存,建立订单 $redis->decr('goodNums'); $redis->sadd('order',$userId); $this->removeLock($lock); } Common::addLog('shop.log',$userId.' 抢购成功'); }catch (\Exception $e){ $this->removeLock($lock); Common::addLog('shop.log',$e->getMessage()); } die('success'); } # 乐观锁 public function actionBuy4(){ $userId = mt_rand(1,99999999); $goods = $this->goods; $redis = Yii::$app->redis; $lock = $goods; try { $inventory['num'] = $redis->get('goodNums'); if($inventory['num']<=0){ throw new \Exception('活动结束'); } $redis->watch($lock); $redis->multi(); //todo:这里还须要从新判断下库存,不然会出现超发,高并发状况下$inventory['num']确定会出现同时读取一个值;为了方便测试,没写db操做 //redis事务是将命令放入队列中,没法取goodNums来判断库存是否结束,此处使用数据库来判断库存合理 //业务处理 减库存,建立订单 $redis->decr('goodNums'); $redis->sadd('order',$userId); $redis->exec(); Common::addLog('shop.log',$userId.' 抢购成功'); }catch (\Exception $e){ Common::addLog('shop.log',$e->getMessage()); } die('success'); } # 队列实现,不作详述 }
服务器访问并发比较大,无效访问频繁,好比说频繁请求接口,爬虫频繁访问服务器,抢购瞬时请求过大,咱们须要限流处理。
限流:对访问来源计数,超过设定次数,设置过时时间,提醒访问频繁,稍后再试
limits=500 #设置1秒内限制次数50 if EXISTS userid return '访问频繁,锁定时间剩余(ttl userid)秒' if userid_count_time > limits exprice userid,3600 return '访问频繁,稍后再试' else MUlTI incr userid_count_time # 对用户每秒的请求进行原子递增计数 exprice userid_count_time , 60 EXEC //使用事务的目的是避免执行错误中断,userid_count_time持久化到磁盘,高并发下这个颇有必要
计数器限流,缺点也很大,可能会超过限制数。相比下,高并发 漏桶算法、令牌桶算法更适合作限流,此处不作深究。
运用数据格式list,lpush、rpop就能够入队、出队,可是会有个问题 假设出队的业务执行发生错误,数据会不会所以丢失,因此须要确保出队时确实被消费了,能够参考下面伪代码处理:
while(val = lrange(list,0,-1)) try{ //对val这条数据的业务代码处理 rpop(list) }catch(Exception e){ //记录错误,通知programmer处理 break; }
参考下lrange语法
服务器中的非空数据库以及数据库中的健值对统称数据库状态。
redis是内存数据库,数据库状态存在内存中,一旦服务器崩掉,服务器状态就会消失不见,因此须要将数据库状态存与磁盘文件中。
按期的将数据库状态保存在一个RDB快照文件中,RDB文件是一个通过压缩的二进制文件,经过该文件可还原生成RDB文件时的数据库状态。
触发方式:手动和自动
RDB 文件的建立和载入
redis命令:SAVE、BGSAVE
SAVE会阻塞Redis服务器进程,直到RDB文件建立完毕为止,在服务器进程阻塞期间,服务器不能处理任何命令请求。
BGSAVE命令会派生出一个子进程,而后由子进程负责建立RDB文件,服务器进程(父进程)继续处理命令请求。
自动触发
redis.conf 中配置
save 900 1 # 表示900 秒内若是至少有 1 个 key 的值变化,则保存 save 300 10 # 表示300 秒内若是至少有 10 个 key 的值变化,则保存 save 60 10000 # 表示60 秒内若是至少有 10000 个 key 的值变化,则保存
“save m n”。表示m秒内数据集存在n次修改时,自动触发BGSAVE。
伪代码
def SAVE(): #建立RDB文件 rdbSave() def BGSAVE(): #建立子进程 pid = fork() if pid == 0: #子进程负责建立RDB文件 rdbSave() #完成以后向父进程发送信号 signal_parent() elif pid > 0: #父进程继续处理命令请求,并经过轮询等待子进程的信号 handle_request_and_wait_signal() else: #处理出错状况 handle_fork_error()
AOF持久化功能实现分为命令追加(append)、文件写入(wirte)、文件同步(sync)三个步骤。
每个写命令都经过write函数追加到 appendonly.aof 中,配置方式:启动 AOF 持久化的方式
伪代码
def eventLoop(): while True: #处理文件事件,接收命令请求以及发送命令回复 #处理命令请求时可能会有新内容被追加到 aof_buf缓冲区中 processFileEvents() #处理时间事件 processTimeEvents() #考虑是否要将 aof_buf中的内容写入和保存到 AOF文件里面 flushAppendOnlyFile()
服务器在执行一个写命令以后,会以协议格式将执行的写命令追加到服务器状态的aof_buf缓冲区的末尾。
操做系统中,用户调用write函数写入,将一些数据写入到文件时,为了提升存储的效率,操做系统一般会将数据暂时保存在一个内存缓冲区里面,缓冲区满了或者超过指定时间,真正将缓冲区数据存储到磁盘,提升了效率,可是若是停机,也会形成缓冲区内的数据丢失,
系统提供了fsync、fdatasync两个同步函数,会强制让操做系统当即将缓冲区的数据写入硬盘,确保数据的安全性。
AOF持久化配置 redis.conf :
appendonly yes #开启AOF appendfilename "appendonly.aof" #默认存储路径 # appendfsync 设置持久化策略,三种: #appendfsync always # 每次有数据修改发生时AOF缓冲区数据都会写入AOF文件并同步 (效率最慢但安全性最高) appendfsync everysec # 每秒钟写入AOF文件并同步一次,该策略为AOF的缺省策略。(效率高,即使丢失数据只会丢失1秒的数据) #appendfsync no # 缓冲区的内容写入到AOF文件,但并不会对AOF文件进行同步,什么时候同步由操做系统来决定(效率高,丢失上一次同步到这一次的所有AOF数据)
appendonly yes开启 AOF 以后,Redis 每执行一个修改数据的命令,都会把它添加到 AOF 文件中,当 Redis 重启时,将会读取 AOF 文件进行“重放”以恢复到 Redis 关闭前的最后时刻。
使用 AOF 持久化会让 Redis 变得很是耐久(much more durable):你能够设置不一样的 fsync 策略,好比无 fsync ,每秒钟一次 fsync ,或者每次执行写入命令时 fsync 。 AOF 的默认策略为每秒钟 fsync 一次,在这种配置下,
Redis 仍然能够保持良好的性能,而且就算发生故障停机,也最多只会丢失一秒钟的数据( fsync 会在后台线程执行,因此主线程能够继续努力地处理命令请求)。
对于相同的数据集来讲,AOF 文件的体积一般要大于 RDB 文件的体积。根据所使用的 fsync 策略,AOF 的速度可能会慢于 RDB。 在通常状况下, 每秒 fsync 的性能依然很是高, 而关闭 fsync 可让 AOF 的速度和 RDB 同样快,
即便在高负荷之下也是如此。 不过在处理巨大的写入载入时,RDB 能够提供更有保证的最大延迟时间(latency)。
随着服务器时间的流逝,AOF文件的体积会愈来愈大。
redis能够看成数据库来存贮数据,如何解决排序查询呢?
SORT命令:
keys *
虽然其模糊匹配功能使用很是方便也很强大,在小数据量状况下使用没什么问题,数据量大会致使 Redis 锁住及 CPU 飙升,在生产环境建议禁用或者重命名!
flushdb
删除 Redis 中当前所在数据库中的全部记录,而且此命令从不会执行失败
flushall
删除 Redis 中全部数据库中的全部记录,不仅是当前所在数据库,而且此命令从不会执行失败。
config
客户端可修改 Redis 配置。
参考:
https://blog.csdn.net/a169388842/article/details/82838818
# 绑定ip,指定地址域链接 bind 192.168.1.100 10.0.0.1