接着上一篇,这篇文章分析一下redis事务操做中multi,exec,discard三个核心命令。c++
原文地址:www.jianshu.com/p/e22615586…redis
看本篇文章前须要先对上面文章有所了解:
redis源码分析之事务Transaction(上)数组
redis事务操做核心命令:bash
//用于开启事务
{"multi",multiCommand,1,"sF",0,NULL,0,0,0,0,0},
//用来执行事务中的命令
{"exec",execCommand,1,"sM",0,NULL,0,0,0,0,0},
//用来取消事务
{"discard",discardCommand,1,"sF",0,NULL,0,0,0,0,0},复制代码
在redis中,事务并不具备ACID的概念,换句话说,redis中的事务仅仅是保证一系列的命令按顺序一个一个执行,若是中间失败了,并不会进行回滚操做。数据结构
使用redis事务举例以下:框架
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set a a
QUEUED
127.0.0.1:6379> set b b
QUEUED
127.0.0.1:6379> set c c
QUEUED
127.0.0.1:6379> exec
1) OK
2) OK
3) OK
127.0.0.1:6379>复制代码
关于事务的几个命令所对应的函数都放在multi.c文件中。函数
首先来看一下multi命令,该命令用于标记客户端开启事务状态,所以它作的就是修改客户端状态,代码很简单,以下:源码分析
void multiCommand(client *c) {
//若是客户端已是事务模式,则返回错误提示信息
if (c->flags & CLIENT_MULTI) {
addReplyError(c,"MULTI calls can not be nested");
return;
}
//设置客户端为事务模式
c->flags |= CLIENT_MULTI;
//返回结果
addReply(c,shared.ok);
}复制代码
接下来看下redis处理命令逻辑中的一段源码:
这段代码在server.c文件中的processCommand方法中:ui
//若是客户端处于事务状态且当前执行的命令不是exec,discard,multi跟watch命令中的一个
//则把当前命令加入一个队列
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
//加入队列
queueMultiCommand(c);
//返回结果
addReply(c,shared.queued);
} else {
//执行当前命令
call(c,CMD_CALL_FULL);
c->woff = server.master_repl_offset;
if (listLength(server.ready_keys))
handleClientsBlockedOnLists();
}复制代码
看入队操做源码前,先来熟悉几个数据结构,redis会把每一个链接的客户端封装成一个client对象,该对象中含有大量字段用来保存须要的信息,发布订阅功能也使用对应的字段进行存储,事务固然也不例外,以下:spa
//每一个客户端对象中有一个mstate字段用来保存事务上下文
typedef struct client {
multiState mstate;
}
//事务包装类型
typedef struct multiState {
//当前事务中须要执行的命令数组
multiCmd *commands;
//须要执行的命令数量
int count;
//须要同步复制的最小数量
int minreplicas;
//同步复制超时时间
time_t minreplicas_timeout;
} multiState;
//事务中执行命令的封装类型
typedef struct multiCmd {
//参数
robj **argv;
//参数数量
int argc;
//命令自己
struct redisCommand *cmd;
} multiCmd;复制代码
了解了基本的数据结构之后,再来看下入队操做:
void queueMultiCommand(client *c) {
//类型前面有说明
multiCmd *mc;
int j;
//扩容,每次扩容一个命令的大小
c->mstate.commands = zrealloc(c->mstate.commands,
sizeof(multiCmd)*(c->mstate.count+1));
//c++中给数组最后一个元素赋值语法实在是有点难懂...
mc = c->mstate.commands+c->mstate.count;
//初始化mc各个字段
mc->cmd = c->cmd;
mc->argc = c->argc;
mc->argv = zmalloc(sizeof(robj*)*c->argc);
//把参数一个一个拷贝过来
memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
for (j = 0; j < c->argc; j++)
incrRefCount(mc->argv[j]);
c->mstate.count++;
}复制代码
上面是把命令加入事务命令数组的中的逻辑,因为在执行事务过程当中也会执行删除事务的操做,所以在看执行事务逻辑以前咱们先看下删除事务的实现原理。
当事务执行完成,执行错误或者客户端想取消当前事务,都会跟discard命令有联系,一块儿看下源码:
void discardCommand(client *c) {
//若是当前客户端没有处于事务状态,则返回错误信息
if (!(c->flags & CLIENT_MULTI)) {
addReplyError(c,"DISCARD without MULTI");
return;
}
//删除事务
discardTransaction(c);
//返回结果
addReply(c,shared.ok);
}
//具体的删除逻辑
void discardTransaction(client *c) {
//释放客户端事务资源
freeClientMultiState(c);
//初始化客户端事务资源
initClientMultiState(c);
//状态位还原
c->flags &= ~(CLIENT_MULTI|CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC);
//取消已watch的key,该函数上面文章中已经进行过度析,不赘述
unwatchAllKeys(c);
}
//释放事务队列中的每一个命令
void freeClientMultiState(client *c) {
int j;
for (j = 0; j < c->mstate.count; j++) {
int i;
multiCmd *mc = c->mstate.commands+j;
//挨个释放命令的参数
for (i = 0; i < mc->argc; i++)
decrRefCount(mc->argv[i]);
zfree(mc->argv);
}
//最后释放命令自己
zfree(c->mstate.commands);
}
//事务相关字段设为初始值
void initClientMultiState(client *c) {
c->mstate.commands = NULL;
c->mstate.count = 0;
}复制代码
到这里,咱们已经了解了开启事务模式,把各个命令加入到事务命令执行数组中以及取消事务三个模块的执行原理,最后一块儿看下事务的执行过程,代码较长,须要慢慢看。
把一系列命令加入到事务命令数组中之后,客户端执行exec命令就能够把其中的全部命令挨个执行完成了,分析exec命令源码以前,咱们应该能够想到redis的逻辑应该就是从客户端的事务命令数组中取出全部命令一个一个执行,源码以下:
void execCommand(client *c) {
int j;
robj **orig_argv;
int orig_argc;
struct redisCommand *orig_cmd;
//标记是否须要把MULTI/EXEC传递到AOF或者slaves节点
int must_propagate = 0;
//标记当前redis节点是否为主节点
int was_master = server.masterhost == NULL;
//若是客户端没有处于事务状态,则返回错误提示信息
if (!(c->flags & CLIENT_MULTI)) {
addReplyError(c,"EXEC without MULTI");
return;
}
//首先对两个须要终止当前事务的条件进行判断
//1.当有WATCH的key被修改时则终止,返回一个nullmultibulk对象
//2.当以前有命令加入事务命令数组出错则终止,例如传入的命令参数数量不对,会返回execaborterr
if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) {
addReply(c, c->flags & CLIENT_DIRTY_EXEC ? shared.execaborterr :
shared.nullmultibulk);
//删除当前事务信息,前面已经分析过,不赘述
discardTransaction(c);
goto handle_monitor;
}
//把watch的key都删除,上面文章已经分析过,不赘述
unwatchAllKeys(c);
//保存当前命令上下文
orig_argv = c->argv;
orig_argc = c->argc;
orig_cmd = c->cmd;
addReplyMultiBulkLen(c,c->mstate.count);
//遍历事务命令数组
for (j = 0; j < c->mstate.count; j++) {
//把事务队列中的命令参数取出赋值给client,由于命令是在client维度执行的
c->argc = c->mstate.commands[j].argc;
c->argv = c->mstate.commands[j].argv;
c->cmd = c->mstate.commands[j].cmd;
//同步事务操做到AOF或者集群中的从节点
if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN))) {
execCommandPropagateMulti(c);
must_propagate = 1;
}
//执行具体命令
call(c,CMD_CALL_FULL);
//因为命令能够修改参数的值或者数量,所以从新保存命令上下文
c->mstate.commands[j].argc = c->argc;
c->mstate.commands[j].argv = c->argv;
c->mstate.commands[j].cmd = c->cmd;
}
//恢复原始命令上下文
c->argv = orig_argv;
c->argc = orig_argc;
c->cmd = orig_cmd;
//事务执行完成,删除该事务,前面已经分析过,不赘述
discardTransaction(c);
//确保EXEC会进行传递
if (must_propagate) {
int is_master = server.masterhost == NULL;
server.dirty++;
if (server.repl_backlog && was_master && !is_master) {
char *execcmd = "*1\r\n$4\r\nEXEC\r\n";
feedReplicationBacklog(execcmd,strlen(execcmd));
}
}
//monitor命令操做
handle_monitor:
if (listLength(server.monitors) && !server.loading)
replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
}复制代码
上面就是事务命令执行的整个逻辑,能够先排除集群跟AOF的同步逻辑,专一理解核心逻辑,代码总体逻辑算是比较清晰的,搞明白了前面的几个模块之后,再看执行逻辑就不会太难。
经过上、下两篇文章对redis事务各个命令进行了分析,仔细阅读应该能够了解整个事务执行框架,若是有任何问题或者疑惑,欢迎留言评论。