除了 RDB 持久化功能以外,Redis 还提供了 AOF(Append Only File)持久化功能。与 RDB 持久化经过保存数据库中的键值对来记录数据库状态不一样,AOF 持久化是经过保存 Redis 服务器所执行的写命令来记录数据库状态的。html
AOF 文件中记录了 Redis 服务器所执行的写命令,以此来保存数据库的状态。AOF 文件本质上是一个 redo log,经过它能够恢复数据库状态。mysql
随着执行命令的增多,AOF 文件的大小会不断增大,这会致使几个问题,好比,磁盘占用增长,重启加载过慢等。所以, Redis 提供了 AOF 重写机制来控制 AOF 文件大小,下面会细说。linux
AOF 文件中写入的全部命令以 Redis 的命令请求协议格式去保存,即 RESP 格式。c++
有两种方式能够实现 AOF 功能的开关,以下,redis
config set appendonly yes/no
。与 AOF 相关的 server 成员变量不少,这里只选择几个进行简要说明。先看后面的章节,以后再回头看本章节,也是个不错的主意。sql
int aof_state; /* AOF_(ON|OFF|WAIT_REWRITE) */ int aof_fsync; /* Kind of fsync() policy */ char *aof_filename; /* Name of the AOF file */ int aof_no_fsync_on_rewrite; /* Don't fsync if a rewrite is in prog. */ int aof_rewrite_perc; /* Rewrite AOF if % growth is > M and... */ off_t aof_rewrite_min_size; /* the AOF file is at least N bytes. */ off_t aof_rewrite_base_size; /* AOF size on latest startup or rewrite. */ off_t aof_current_size; /* AOF current size. */ int aof_rewrite_scheduled; /* Rewrite once BGSAVE terminates. */ pid_t aof_child_pid; /* PID if rewriting process */ list *aof_rewrite_buf_blocks; /* Hold changes during an AOF rewrite. */ sds aof_buf; /* AOF buffer, written before entering the event loop */ int aof_fd; /* File descriptor of currently selected AOF file */ int aof_selected_db; /* Currently selected DB in AOF */ time_t aof_flush_postponed_start; /* UNIX time of postponed AOF flush */ time_t aof_last_fsync; /* UNIX time of last fsync() */ time_t aof_rewrite_time_last; /* Time used by last AOF rewrite run. */ time_t aof_rewrite_time_start; /* Current AOF rewrite start time. */ int aof_lastbgrewrite_status; /* C_OK or C_ERR */ unsigned long aof_delayed_fsync; /* delayed AOF fsync() counter */ int aof_rewrite_incremental_fsync;/* fsync incrementally while rewriting? */ int aof_last_write_status; /* C_OK or C_ERR */ int aof_last_write_errno; /* Valid if aof_last_write_status is ERR */ int aof_load_truncated; /* Don't stop on unexpected AOF EOF. */
表示 AOF 刷盘策略,后面会细说数据库
因为 aofrewrite 是个耗时操做,所以会 fork 一个子进程去作这件事, aof_child_pid 就标识了子进程的 pid。数组
该变量保存着全部等待写入到 AOF 文件的协议文本。缓存
该变量用来保存 aofrewrite 期间,server 处理过的须要写入 AOF 文件的协议文本。这个变量采用 list 结构,是考虑到分配到一个很是大的空间并不老是可能的,也可能产生大量的复制工做。安全
可取值有 0 和 1。
取 1 时,表示此时有子进程正在作 aofrewrite 操做,本次任务后延,等到 serverCron
执行时,合适的状况再执行。或者是执行了 config set appendonly yes
, 想把 AOF 功能打开,此时执行的 aofrewrite 失败了,aof_state 仍然处于 AOF_WAIT_REWRITE 状态,此时 aof_rewrite_scheduled 也会置为 1,等下次再执行 aofrewrite。
表示 AOF 功能如今的状态,可取值以下,
#define AOF_OFF 0 /* AOF is off */ #define AOF_ON 1 /* AOF is on */ #define AOF_WAIT_REWRITE 2 /* AOF waits rewrite to start appending */
AOF_OFF 表示 AOF 功能处于关闭状态,开关在上一节已经说过,默认 AOF 功能是关闭的。AOF 功能从 off switch 到 on 后,aof_state 会从 AOF_OFF 变为 AOF_WAIT_REWRITE,startAppendOnly
函数完成该逻辑。在 aofrewrite 一次以后,该变量才会从 AOF_WAIT_REWRITE 变为 AOF_ON。
能够看到从 ON 切换到 OFF 时,要经历一个中间状态 AOF_WAIT_REWRITE,那为什么要这么设计呢?再来分析一下 startAppendOnly
函数的逻辑(代码去掉了打印日志的部分)。
server.aof_fd = open(server.aof_filename,O_WRONLY|O_APPEND|O_CREAT,0644); serverAssert(server.aof_state == AOF_OFF); if (server.aof_fd == -1) { char *cwdp = getcwd(cwd,MAXPATHLEN); return C_ERR; } if (server.rdb_child_pid != -1) { server.aof_rewrite_scheduled = 1; } else if (rewriteAppendOnlyFileBackground() == C_ERR) { close(server.aof_fd); return C_ERR; } server.aof_state = AOF_WAIT_REWRITE;
【1】打开 aof 文件,默认名为 appendonly.aof,没有的话就新建空文件,失败则返回。
【2】切换后,须要作一次 aofrewrite,将 server 中现有的数据转换成协议文本,写到 AOF 文件。可是,这里要注意,若是此时有子进程在作 bgrdb,那么这次 aofrewrite 须要任务延缓,即 aof_rewrite_scheduled 置为 1。
【3】将 aof_state 置为 AOF_WAIT_REWRITE 状态。
而作完第一次 aofrewrite 后,AOF_WAIT_REWRITE 转换成 AOF_ON,以下,
void backgroundRewriteDoneHandler(int exitcode, int bysignal) { ... if (server.aof_state == AOF_WAIT_REWRITE) server.aof_state = AOF_ON; ... }
仔细分析源码发现,在 AOF 持久化的命令追加阶段(后面章节细讲),有以下逻辑,
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) { ... if (server.aof_state == AOF_ON) server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf)); if (server.aof_child_pid != -1) aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf)); ... }
很明显,刚开启 AOF 时, aof_state 为 AOF_WAIT_REWRITE ,处理好的协议文本 buf 没法写入 aof_buf 变量 ,但必须写入 aof_rewrite_buf_blocks 变量(数据在 aofrewrite 的最后阶段会被写进 AOF 文件)。
这里是否将命令 append 到 aof_state 的判断相当重要,若是修改条件为 server.aof_state != AOF_OFF
,考虑以下状况。
AOF 状态刚打开,还没有完成第一次 aofrewrite,也即,一边 Child 进程数据库中现有数据还未写进 AOF 文件,另外一边 Parent 进程仍然持续处理 client 请求,因而,Parent 进程在指定的数据刷盘策略下,将 aof_buf 刷盘。若是这时宕机了,当 server 重启后,加载 AOF 文件,在内存中塞入数据,实际上对于用户来讲,这部分数据算是脏数据了,由于 AOF 并无成功打开,未开启 AOF 状态时,数据都在内存中,宕机后,数据会所有丢掉。增长这个中间状态就是为了应对这种状况。因此, AOF_WAIT_REWRITE 状态存在的时间范围起始于 startAppendOnly
,到完成第一次 aofrewrite 后切成 AOF_ON 。aofrewrite 后再发生宕机,丢失的数据就少多了。
这只是我我的的理解,不必定正确,欢迎你们斧正。
另外,若是开启了 AOF,在 redis 启动 加载 AOF 文件时,aof_state 也会暂时设置成 AOF_OFF,加载完毕以后设置为 AOF_ON。
为了提升 aofrewrite 效率,Redis 经过在父子进程间创建管道,把 aofrewrite 期间的写命令经过管道同步给子进程,追加写盘的操做也就转交给了子进程。aof_pipe_* 变量就是这部分会用到的管道。
AOF 功能开启后,每次致使数据库状态发生变化的命令都会通过函数 feedAppendOnlyFile
累积到 aof_buf 变量中。若是后台有正在执行的 aofrewrite 任务,还会写一份数据到 aof_rewrite_buf_blocks 变量中。
在该函数中,首先要将数据库切换到当前数据库( aof_selected_db 更新),在 buf 中插入一条 SELECT 命令。
sds buf = sdsempty(); if (dictid != server.aof_selected_db) { char seldb[64]; snprintf(seldb,sizeof(seldb),"%d",dictid); buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n", (unsigned long)strlen(seldb),seldb); server.aof_selected_db = dictid; }
而后在对须要加入 buf 的命令进行分类处理。
【1】带有过时时间的命令,调用函数 catAppendOnlyExpireAtCommand
进行协议文本 buf 组装。EXPIRE/PEXPIRE/EXPIREAT 这三个命令直接调用该函数,而 SETEX/PSETEX 这两个命令须要在调用以前加入一个 SET 命令。即,
tmpargv[0] = createStringObject("SET",3); tmpargv[1] = argv[1]; tmpargv[2] = argv[3]; buf = catAppendOnlyGenericCommand(buf,3,tmpargv); decrRefCount(tmpargv[0]); buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
【2】普通命令,直接调用函数 catAppendOnlyGenericCommand
进行协议文本 buf 组装。
该函数其实就是将全部与过时时间相关的命令转成 PEXPIREAT 命令,细化到毫秒。最后调用普通命令组装 buf 函数 catAppendOnlyGenericCommand
。
// 构建 PEXPIREAT 命令 argv[0] = createStringObject("PEXPIREAT",9); argv[1] = key; argv[2] = createStringObjectFromLongLong(when); // 调用 aof 公共函数 buf = catAppendOnlyGenericCommand(buf, 3, argv);
该函数用来把 redis 命令转换成 RESP 协议文本。
sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) { char buf[32]; int len, j; robj *o; // 好比 *3\r\n buf[0] = '*'; len = 1+ll2string(buf+1,sizeof(buf)-1,argc); buf[len++] = '\r'; buf[len++] = '\n'; dst = sdscatlen(dst,buf,len); for (j = 0; j < argc; j++) { o = getDecodedObject(argv[j]); buf[0] = '$'; len = 1+ll2string(buf+1,sizeof(buf)-1,sdslen(o->ptr)); buf[len++] = '\r'; buf[len++] = '\n'; dst = sdscatlen(dst,buf,len); dst = sdscatlen(dst,o->ptr,sdslen(o->ptr)); dst = sdscatlen(dst,"\r\n",2); decrRefCount(o); } return dst; }
能够看到,定义了一个 buf 数组,反复使用,经过 len 精确控制 append 到 dst 后的长度。
aof_rewrite_buf_blocks 变量是一个 list 结构,其中每个元素都是一个大小为 10M 的 block
#define AOF_RW_BUF_BLOCK_SIZE (1024*1024*10) /* 10 MB per block */ typedef struct aofrwblock { unsigned long used, free; char buf[AOF_RW_BUF_BLOCK_SIZE]; } aofrwblock;
这个函数作了两件事情。
一是,将 catAppendOnlyGenericCommand
得到的协议文本 buf 存到 aof_rewrite_buf_blocks 变量,首先拿出来 list 最后一个 block,若是装不下,那先把最后一个 block 填满,剩下的再申请内存。
listNode *ln = listLast(server.aof_rewrite_buf_blocks); // 指向最后一个缓存块 aofrwblock *block = ln ? ln->value : NULL; while(len) { if (block) { // 若是已经有至少一个缓存块,那么尝试将内容追加到这个缓存块里面 unsigned long thislen = (block->free < len) ? block->free : len; if (thislen) { /* The current block is not already full. */ memcpy(block->buf+block->used, s, thislen); block->used += thislen; block->free -= thislen; s += thislen; len -= thislen; } } if (len) { // 最后一个缓存块没有放得下本次 data,那再申请一个 block int numblocks; block = zmalloc(sizeof(*block)); block->free = AOF_RW_BUF_BLOCK_SIZE; block->used = 0; listAddNodeTail(server.aof_rewrite_buf_blocks,block); ... ... } }
二是,给 aof_pipe_write_data_to_child 这个 fd 注册写事件,回调函数为 aofChildWriteDiffData
。
/* Install a file event to send data to the rewrite child if there is * not one already. */ if (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) { aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL); }
这个属于 aof 重写的逻辑,后面章节会细说,这里先留个心。
也就是说,何时会调用feedAppendOnlyFile
呢?有如下两个时机。
你们都知道,Redis 中命令执行的流程,即 processCommand
-> call
。在 call
函数中会把某些命令写入 AOF 文件。如何判断某个命令是否须要写入 AOF 呢?
在 server 结构体中维持了一个 dirty 计数器,dirty 记录的是服务器状态进行了多少次修改,每次作完 save/bgsave 执行完成后,会将 dirty 清 0,而使得服务器状态修改的命令通常都须要写入 AOF 文件和主从同步(排除某些特殊状况)。
dirty = server.dirty; c->cmd->proc(c); dirty = server.dirty-dirty; ... if (propagate_flags != PROPAGATE_NONE) propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
在 propagate
函数中就会调用到 feedAppendOnlyFile
。
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags) { if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF) feedAppendOnlyFile(cmd,dbid,argv,argc); if (flags & PROPAGATE_REPL) replicationFeedSlaves(server.slaves,dbid,argv,argc); }
当内存中带有过时时间的 key 过时时,会向 AOF 写入 del 命令。
void propagateExpire(redisDb *db, robj *key) { ... if (server.aof_state != AOF_OFF) feedAppendOnlyFile(server.delCommand,db->id,argv,2); replicationFeedSlaves(server.slaves,db->id,argv,2); ... }
propagateExpire
函数在一些检查 key 是否过时时会调用。
上一步中,将须要写入 AOF 文件的数据先写到了 aof_buf 变量中,那么,接下来讲一下如何将 aof_buf 的内容写进 AOF 文件。
为了提升文件的写入效率,在现代操做系统中,当用户调用write
函数试,将一些数据写入到文件的时候,操做系统一般会将写入的数据保存在一个内存缓冲区里,等到缓冲区的空间被填满,或者超过了指定的时限后,才真正地将缓冲区中的数据写入磁盘。这种作法虽然提升了效率,但也为写入数据带来了安全问题,由于若是计算机宕机,那么保存在内存缓冲区里面的写入数据将会丢失。
为此,系统提供了
fsync
和fdatasync
两个同步函数,它们能够强制让操做系统当即将缓存区中的数据写入到硬盘里面,从而确保写入数据的安全性。
要知道,这两个系统调用函数都是阻塞式的,针对如何协调文件写入与同步的关系,该版本 Redis 支持 3 种同步策略,可在配置文件中使用 appendfsync 项进行配置,有以下取值,
在 Redis 源码中, 当程序运行在 Linux 系统上时,执行的是 fdatasync
函数,而在其余系统上,则会执行 fsync
函数,即,
#ifdef __linux__ #define aof_fsync fdatasync #else #define aof_fsync fsync #endif
注:如下叙述均以 fsync
代称。
写入文件的逻辑在 flushAppendOnlyFile
函数中实现。下面分两部分来看主要代码。
write
系统调用... // aof 缓存区内没有数据须要写入 disk,无需处理 if (sdslen(server.aof_buf) == 0) return; // 若是 sync policy 设置成 everysec, // sync_in_progress 表示是否有 fsync 任务在后台 if (server.aof_fsync == AOF_FSYNC_EVERYSEC) sync_in_progress = bioPendingJobsOfType(BIO_AOF_FSYNC) != 0; // force=0(非强制写入)时,若是后台有 fsync 任务,推迟这次写入,但推迟时间不超过 2s if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) { if (sync_in_progress) { if (server.aof_flush_postponed_start == 0) { // 首次推迟 write,一次推迟 2s server.aof_flush_postponed_start = server.unixtime; return; } else if (server.unixtime - server.aof_flush_postponed_start < 2) { return; } // 不然,经过,继续写,由于咱们不能等待超过 2s server.aof_delayed_fsync++; serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis."); } } ... // 将 aof 缓冲区的内容写到系统缓存区 nwritten = write(server.aof_fd, server.aof_buf, sdslen(server.aof_buf)); ... // 执行了 write 操做,因此要清零延迟 flush 的时间 server.aof_flush_postponed_start = 0;
首先会判断 aof_buf 是否为空,若是是,那么不须要执行下面的逻辑,直接返回。
若是同步策略为 everysec,那么须要查看是否有 fsync 任务在后台,调用 fsync 使用的是 Redis 中 bio ,若是对这个还不了解,能够参考我以前的文章 《 Redis Bio 详解 》。为何要作这个判断呢?
当fsync
和write
同一个 fd 时,write
必然阻塞。 当系统 IO 很是繁忙时,fsync
() 可能会阻塞, 即便系统 IO 不繁忙,fsync
也会由于数据量大而慢。
所以对于 everysec 策略,须要尽可能保证 fsync
和 write
不一样时操做同一个 fd。no 策略彻底把 fsync
交给了操做系统,操做系统何时 fsync
,无从得知。always 策略则是每次都要主从调用 fsync
,也不必作判断。所以,这里的判断,只针对 everysec 策略有效。
对于 everysec 策略,若是有 fsync
在执行,那么本次 write
推迟 2 秒钟,等到下次在进入本函数时,若是推迟时间超过 2 秒,那么更新 aof_delayed_fsync 值(info 里能够查到),打印日志 ” Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis. “ ,以后进行 write
系统调用。固然了,系统也提供了 force 选项,去跳过这项是否要推迟 write
的检查。
write
以后,将 aof_flush_postponed_start 推迟开始计时值清零,迎接下次检查。
因此说,AOF 执行 everysec 策略时,若是刚好有 fsync
在长时间的执行,Redis 意外关闭会丢失最多两秒的数据。若是 fsync
运行正常,只有当操做系统 crash 时才会形成最多 1 秒的数据丢失。
write
结果处理write
调用结果多是正常的,也多是异常的,那么须要作不一样的处理。首先主要看异常处理,
if (nwritten != (signed)sdslen(server.aof_buf)) { ... /* Log the AOF write error and record the error code. */ if (nwritten == -1) { ... } else { // 若是仅写了一部分,发生错误 // 将追加的内容截断,删除了追加的内容,恢复成原来的文件 if (ftruncate(server.aof_fd, server.aof_current_size) == -1) { ... } else { nwritten = -1; } server.aof_last_write_errno = ENOSPC; } // 若是是写入的策略为每次写入就同步,没法恢复这种策略的写,由于咱们已经告知使用者,已经将写的数据同步到磁盘了,所以直接退出程序 if (server.aof_fsync == AOF_FSYNC_ALWAYS) { ... exit(1); } else { // 设置执行write操做的状态 server.aof_last_write_status = C_ERR; if (nwritten > 0) { // 只能更新当前的 AOF 文件的大小 server.aof_current_size += nwritten; // 删除 AOF 缓冲区写入的字节 sdsrange(server.aof_buf,nwritten,-1); } return; /* We'll try again on the next call... */ } } else { /* Successful write(2). If AOF was in error state, restore the * OK state and log the event. */ if (server.aof_last_write_status == C_ERR) { serverLog(LL_WARNING, "AOF write error looks solved, Redis can write again."); server.aof_last_write_status = C_OK; } }
写入异常的判断,nwritten != (signed)sdslen(server.aof_buf)
,write
的数据量与 aof_buf 的大小不一样。当彻底没写入时,打个日志就算了;当仅写入了一部分数据时,使用 ftruncate
函数把 AOF 文件的内容恢复成原来的大小,以备下次从新写入,nwritten 置为 -1。使用 ftruncate
的缘由是怕操做系统执行了 fsync
,所以须要把 AOF 文件的大小恢复。
若是执行的是 always 同步策略,那么须要返回会客户端错误。对于其余策略,更新 aof_last_write_status
,以便知道上一次作 write
的结果,对于未彻底写入的状况,若是上面执行的 ftruncate
失败,此时 nwritten > 0
,须要更新 aof_current_size,从 aof_buf 中减去已经写入的,防止下次有重复数据写入,而后返回。
若是写入成功,那么视状况更新 aof_last_write_status
,表示这次 write
成功。
下面主要是正常状况的处理。
/* nwritten = -1 时走不到这个步骤 */ server.aof_current_size += nwritten; // 正常 write,更新 aof_current_size /* Re-use AOF buffer when it is small enough. The maximum comes from the * arena size of 4k minus some overhead (but is otherwise arbitrary). */ if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) { sdsclear(server.aof_buf); } else { sdsfree(server.aof_buf); server.aof_buf = sdsempty(); } /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are * children doing I/O in the background. */ if (server.aof_no_fsync_on_rewrite && (server.aof_child_pid != -1 || server.rdb_child_pid != -1)) return; /* Perform the fsync if needed. */ if (server.aof_fsync == AOF_FSYNC_ALWAYS) { /* aof_fsync is defined as fdatasync() for Linux in order to avoid * flushing metadata. */ latencyStartMonitor(latency); aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */ latencyEndMonitor(latency); latencyAddSampleIfNeeded("aof-fsync-always",latency); server.aof_last_fsync = server.unixtime; } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC && server.unixtime > server.aof_last_fsync)) { if (!sync_in_progress) aof_background_fsync(server.aof_fd); // 若是没有正在执行同步,那么建立一个后台任务 server.aof_last_fsync = server.unixtime; }
aof_buf 清空,而后根据不一样策略进行同步。always 策略时,主动调用 fsync
; everysec 策略,则建立 fsync bio 任务。
另外,有配置项 no-appendfsync-on-rewrite 去决定,当子进程在作 aofrewrite/bgsave 时是否要进行 fsync
。
也就是,何时会调用 flushAppendOnlyFile
函数,有如下三个时机。
Redis 的服务器进程就是一个事件循环,这个循环中的文件事件负责接收客户端请求,以及向客户端发送命令回复,而时间事件则负责像
serverCron
函数这样须要定时运行的函数。
对于 Redis 的事件机制能够参考我以前的文章 《Redis 中的事件》。
由于服务器在处理文件事件时可能会执行写命令,使得一些内容被追加到 aof_buf 缓冲区里面,因此在服务器每次结束一个事件循环以前,都会调用 flushAppendOnlyFile
函数,考虑是否须要将 aof_buf 缓冲区中的内容写入和同步到 AOF 文件里面。即,
void beforeSleep(struct aeEventLoop *eventLoop) { ... /* Write the AOF buffer on disk */ flushAppendOnlyFile(0); ... }
这里的调用是非强制写入(force = 0)。
Redis 中的时间事件,按期执行 serverCron
函数(从 Redis 2.8 开始,用户能够经过修改 hz 选项来调整 serverCron
的每秒执行次数),作一些琐事,好比更新服务器各项统计信息、关闭清理客户端、作 AOF 和 RDB 等。
/* AOF postponed flush: Try at every cron cycle if the slow fsync completed. */ if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
若是上次 AOF 写入推迟了,那么再次尝试非强制写入。
run_with_period(1000) { if (server.aof_last_write_status == C_ERR) flushAppendOnlyFile(0); }
每秒钟检查,若是上次写入 AOF 文件失败了,再次尝试非强制写入。由于须要及时去处理 aof_buf
,以及重置 AOF 写入状态的变量 aof_last_write_status,每秒作检查,这个频率是足够的。
当 AOF 功能要关闭时,会调用 stopAppendOnly
函数,尝试一次强制写入,即尽最大努力去保存最多的数据。
void stopAppendOnly(void) { serverAssert(server.aof_state != AOF_OFF); flushAppendOnlyFile(1); aof_fsync(server.aof_fd); close(server.aof_fd); }
强制写入,并刷盘。
当 Redis 服务器进程启动时,须要调用 loadDataFromDisk
函数去加载数据。
void loadDataFromDisk(void) { long long start = ustime(); if (server.aof_state == AOF_ON) { // 开启了 aof if (loadAppendOnlyFile(server.aof_filename) == C_OK) serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000); } else { if (rdbLoad(server.rdb_filename) == C_OK) { serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds", (float)(ustime()-start)/1000000); } else if (errno != ENOENT) { serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno)); exit(1); } } }
能够看到,若是开启了 AOF 功能,就会调用 loadAppendOnlyFile
函数,加载 AOF 文件中的数据到内存中。不然,会去调用 rdbLoad
函数,加载 RDB 文件。加载 AOF 文件的设计颇有意思。
FILE *fp = fopen(filename,"r"); struct redis_stat sb; int old_aof_state = server.aof_state; long loops = 0; off_t valid_up_to = 0; /* Offset of the latest well-formed command loaded. */ // 检查文件的正确性, 存在,而且不为空 if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) { server.aof_current_size = 0; fclose(fp); return C_ERR; } if (fp == NULL) { serverLog(LL_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno)); exit(1); } // 暂时关掉 AOF, 防止向该 filename 中写入新的 AOF 数据 server.aof_state = AOF_OFF;
首先,空文件没有必要再去加载了,提早返回。
而后,暂时关闭 AOF 功能,这是为了防止在加载 AOF 文件的过程当中,又有新的数据写进来
fakeClient = createFakeClient(); // 建立一个不带网络链接的伪客户端 startLoading(fp); // 标记正在 load db,loading = 1 // 读 AOF 文件 while(1) { int argc, j; unsigned long len; robj **argv; char buf[128]; sds argsds; struct redisCommand *cmd; ... ... // 如执行命令 SET keytest val,那么写入 AOF 文件中的格式为 // *3\r\n$3\r\nSET\r\n$7\r\nkeytest\r\n$3\r\nval\r\n if (fgets(buf,sizeof(buf),fp) == NULL) { // 按行读取 AOF 文件,*3 if (feof(fp)) break; else goto readerr; } if (buf[0] != '*') goto fmterr; // 判断协议是否正确 if (buf[1] == '\0') goto readerr; // 数据完整判断 argc = atoi(buf+1); if (argc < 1) goto fmterr; argv = zmalloc(sizeof(robj*)*argc); fakeClient->argc = argc; fakeClient->argv = argv; for (j = 0; j < argc; j++) { if (fgets(buf,sizeof(buf),fp) == NULL) { // 依次读到 $3, $7, $3 fakeClient->argc = j; /* Free up to j-1. */ freeFakeClientArgv(fakeClient); goto readerr; } if (buf[0] != '$') goto fmterr; len = strtol(buf+1,NULL,10); // 参数长度 argsds = sdsnewlen(NULL,len); if (len && fread(argsds,len,1,fp) == 0) { // 依次读到 SET/ keytest/ val sdsfree(argsds); fakeClient->argc = j; /* Free up to j-1. */ freeFakeClientArgv(fakeClient); goto readerr; } argv[j] = createObject(OBJ_STRING,argsds); if (fread(buf,2,1,fp) == 0) { // 读到 \r\n fakeClient->argc = j+1; /* Free up to j. */ freeFakeClientArgv(fakeClient); goto readerr; /* discard CRLF */ } } /* Command lookup */ cmd = lookupCommand(argv[0]->ptr); if (!cmd) { serverLog(LL_WARNING,"Unknown command '%s' reading the append only file", (char*)argv[0]->ptr); exit(1); } /* Run the command in the context of a fake client */ cmd->proc(fakeClient); /* The fake client should not have a reply */ serverAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0); /* The fake client should never get blocked */ serverAssert((fakeClient->flags & CLIENT_BLOCKED) == 0); /* Clean up. Command code may have changed argv/argc so we use the * argv/argc of the client instead of the local variables. */ freeFakeClientArgv(fakeClient); if (server.aof_load_truncated) valid_up_to = ftello(fp); }
上面这部分是加载 AOF 文件的关键,以 SET keytest val
命令对应的 AOF 文件内容 *3\r\n$3\r\nSET\r\n$7\r\nkeytest\r\n$3\r\nval\r\n
为例,能够更好地理解上面的逻辑。因为 AOF 文件中存储的数据与客户端发送的请求格式相同彻底符合 Redis 的通讯协议,所以 Redis Server 建立伪客户端 fakeClient,将解析后的 AOF 文件数据像客户端请求同样调用各类指令,cmd->proc(fakeClient)
,将 AOF 文件中的数据重现到 Redis Server 数据库中。
完成以上逻辑后,进行一些收尾工做,如改回 AOF 状态为 ON,释放伪客户端等,并处理一些异常状况,这里就不展开细讲了。