Redis 持久化之 AOF 重写

由于 AOF 持久化是经过保存被执行的写命令来记录数据库状态的,因此随着服务器运行时间的流逝,AOF 文件中的内容会原来越多,文件的体积也会愈来愈大,若不加以控制,体积过大的 AOF 文件极可能对 Redis 服务器、甚至整个宿主计算机形成影响,而且其体积越大,使用 AOF 文件来进行数据还原所须要的时间就越长。

为防止 aofrewrite 过程阻塞服务器,Redis 服务器会 fork 一个子进程执行该过程,且任什么时候刻只能有一个子进程作这件事。mysql

server 相关变量

为了保证 AOF 的连续性,父进程把 aofrewrite 期间的写命令缓存起来,等子进程重写以后再追加到新的 AOF 文件。若是 aofrewrite 期间写命令写入量较大的话,子进程结束后,父进程的追加就涉及到大量的写磁盘操做,形成服务性能降低。redis

Redis 经过在父子进程间创建 pipe,把 aofrewrite 期间的写命令经过 pipe 同步给子进程,这样一来,追加写盘的操做也就转嫁给了子进程。Redis server 中与之相关的变量主要有如下几个,主要三个 pipe。sql

int aof_pipe_write_data_to_child;
int aof_pipe_read_data_from_parent;
int aof_pipe_write_ack_to_parent;
int aof_pipe_read_ack_from_child;
int aof_pipe_write_ack_to_child;
int aof_pipe_read_ack_from_parent;
int aof_stop_sending_diff; /*If true stop sending accumulated diffs to child process. */
sds aof_child_diff;        /* AOF diff accumulator child side. */

实现原理

aofrewrite 的入口逻辑在 rewriteAppendOnlyFileBackground 函数。数据库

int rewriteAppendOnlyFileBackground(void) {
    ...
    if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
    ...
}

要确保没有后台进程作 aofrewrite 或者 rdb,才会考虑作本次的 aofrewrite。缓存

pipe 初始化

int rewriteAppendOnlyFileBackground(void) {
   ...
   if (aofCreatePipes() != C_OK) return C_ERR; 
   ...
}
int aofCreatePipes(void) {
    int fds[6] = {-1, -1, -1, -1, -1, -1};
    int j;

    if (pipe(fds) == -1) goto error; /* parent -> children data. */
    if (pipe(fds+2) == -1) goto error; /* children -> parent ack. */
    if (pipe(fds+4) == -1) goto error; /* children -> parent ack. */
    /* Parent -> children data is non blocking. */
    if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;
    if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;

    /* 注册读事件处理函数,负责处理子进程要求中止数据传输的消息 */
    if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;

    server.aof_pipe_write_data_to_child = fds[1];
    server.aof_pipe_read_data_from_parent = fds[0];
    server.aof_pipe_write_ack_to_parent = fds[3];
    server.aof_pipe_read_ack_from_child = fds[2];
    server.aof_pipe_write_ack_to_child = fds[5];
    server.aof_pipe_read_ack_from_parent = fds[4];
    server.aof_stop_sending_diff = 0; /* 是否中止管道传输标记位 */
    return C_OK;

error:
    serverLog(LL_WARNING,"Error opening /setting AOF rewrite IPC pipes: %s",
        strerror(errno));
    for (j = 0; j < 6; j++) if(fds[j] != -1) close(fds[j]);
    return C_ERR;
}

aofCreatePipes 函数中,对 pipe 进行初始化,pipe 各变量的用处从名字也能够看出来,一共有三条 pipe,每条 pipe 一来一回,占用两个 fd。服务器

pipe 1 用于父进程向子进程发送缓存的新数据。子进程在 aofrewrite 时,会按期从该管道中读取数据并缓存起来,并在最后将缓存的数据写入重写的新 AOF 文件,这两个 fd 都设置为非阻塞式的。app

pipe 2 负责子进程向父进程发送结束信号。父进程监听 fds[2] 读事件,回调函数为 aofChildPipeReadable。父进程不断地接收客户端命令,可是子进程不可能无休止地等待父进程的数据,所以,子进程在遍历完数据库全部数据以后,从 pipe 1 中执行一段时间的读取操做后,就会向 pipe 2 中发送一个特殊标记 "!",父进程收到子进程的 "!" 后,就会置 server.aof_stop_sending_diff 为 1,表示再也不向父进程发送缓存数据了。socket

pipe 3 负责父进程向子进程发送应答信号。父进程收到子进程的 "!" 后,会经过该管道也向子进程应答一个 "!",表示已收到了中止信号。ide

详细过程后面会细说。函数

父进程处理逻辑

rewriteAppendOnlyFileBackground 函数

接着上面的逻辑,server fork 出一个子进程,两个进程分别作各有不一样的处理,下面先看父进程的一些主要处理(代码有删减)。

int rewriteAppendOnlyFileBackground(void) {
    ...
    if ((childpid = fork()) == 0) {
        ... ...
    } else {
        server.aof_rewrite_scheduled = 0;
        server.aof_child_pid = childpid;
        updateDictResizePolicy();
        server.aof_selected_db = -1;
        replicationScriptCacheFlush();
        return C_OK;
    }
    ...
}

server.aof_rewrite_scheduled 置零,防止在 serverCron 函数中重复触发 aofrewrite,这时由于 serverCron 中有以下逻辑,

int rewriteAppendOnlyFileBackground(void) {
    ...
    if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
        server.aof_rewrite_scheduled)
    {
        rewriteAppendOnlyFileBackground();
    }
    ...
}

这里,updateDictResizePolicy 函数所作的操做是很重要的,以下,

void updateDictResizePolicy(void) {
    if (server.rdb_child_pid == -1 && server.aof_child_pid == -1)
        dictEnableResize();
    else
        dictDisableResize();
}

也就是说,在后台有子进程作 aofrewrite 或 rdb 时,就不要作 dict rehash 了。如今大多数操做系统都采用写时复制(copy-on-write)来优化子进程的使用效率,因此在子进程存在期间,应该避免没必要要的内存写入,不然会引发大量的内存 copy,影响性能。COW 的知识能够参考文档 《Copy On Write机制了解一下》。

另外,server.aof_selected_db 置为 -1,是为了在子进程进行数据库扫描时插入 select 命令,以便选择正确的数据库。

aofRewriteBufferAppend 函数

在上一篇博客中说过,在 feedAppendOnlyFile 函数 append 写命令时,若是当前有子进程在作 aofrewrite 时,须要将写命令写到 server.aof_rewrite_buf_blocks 中一份。该变量是一个链表,其中每一个节点最大10MB。

void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
    ...
    if (server.aof_child_pid != -1)
        aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
}
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
    ... ...
    /* Install a file event to send data to the rewrite child if there is
     * not one already. */
    if (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) {
        aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,
            AE_WRITABLE, aofChildWriteDiffData, NULL);
    }
}

server.aof_pipe_write_data_to_child 注册写事件,回调函数为 aofChildWriteDiffData

void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
    listNode *ln;
    aofrwblock *block;
    ssize_t nwritten;
    UNUSED(el);
    UNUSED(fd);
    UNUSED(privdata);
    UNUSED(mask);

    while(1) {
        ln = listFirst(server.aof_rewrite_buf_blocks);
        block = ln ? ln->value : NULL;
        if (server.aof_stop_sending_diff || !block) {
            aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,
                              AE_WRITABLE);
            return;
        }
        if (block->used > 0) {
            nwritten = write(server.aof_pipe_write_data_to_child,
                             block->buf,block->used);
            if (nwritten <= 0) return;
            memmove(block->buf,block->buf+nwritten,block->used-nwritten);
            block->used -= nwritten;
        }
        if (block->used == 0) listDelNode(server.aof_rewrite_buf_blocks,ln);
    }
}

当子进程告诉父进程不要发数据(server.aof_stop_sending_diff = 1)或者 server.aof_rewrite_buf_blocks 为空时,删除写事件。

不然,往 pipe1 中写入数据,而后写入的数据从 server.aof_rewrite_buf_blocks 删掉。

子进程处理逻辑

int rewriteAppendOnlyFileBackground(void) {
    ...
    char tmpfile[256];
    closeListeningSockets(0);               /* child 关闭没必要要的 socket */
    redisSetProcTitle("redis-aof-rewrite"); /* 修改进程名为 redis-aof-rewrite */
    snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
    ...
}

首先作一些必要的处理,临时 AOF 文件名为 temp-rewriteaof-bg-%d.aof

而后进入正式的处理函数 rewriteAppendOnlyFile,如下贴上主要代码(有删减)。

int rewriteAppendOnlyFile(char *filename) {
    ...
    snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
    fp = fopen(tmpfile,"w");
    server.aof_child_diff = sdsempty(); /* 初始化 aof_child_diff */
    ...
}

aof_child_diff 变量中存放在 aofwrite 期间,子进程接收到父进程经过 pipe 传过来的缓存数据。

而后就是扫描数据库的操做。

int rewriteAppendOnlyFile(char *filename) {
    ...
    rio aof;
    for (j = 0; j < server.dbnum; j++) {
        redisDb *db = server.db+j;
        dict *d = db->dict;
        if (dictSize(d) == 0) continue; // skip empty database
        di = dictGetSafeIterator(d);
        while((de = dictNext(di)) != NULL) {
            ... ...
            if (aof.processed_bytes > processed+1024*10) { // 10K
                processed = aof.processed_bytes;
                aofReadDiffFromParent();
            }
        }
        dictReleaseIterator(di);
        di = NULL;
    }
    if (fflush(fp) == EOF) goto werr;
    if (fsync(fileno(fp)) == -1) goto werr;
    ...
}

以上逻辑里,子进程会挨个 db 扫描每个 key,根据 key 的类型使用不一样的函数进行数据重写,带过时时间的数据,都须要 append 一个 PEXPIREAT 命令。

有一点须要注意,前面说到利用 pipe 优化 aofwrite,能够看到上面的逻辑,每遍历一个 db,若是 rio 写入的数据量超过了 10K,那么就经过 pipe 从父进程读一次数据,将数据累加到 server.aof_child_diff

ssize_t aofReadDiffFromParent(void) {
    char buf[65536]; /* Default pipe buffer size on most Linux systems. */
    ssize_t nread, total = 0;

    while ((nread = read(server.aof_pipe_read_data_from_parent,buf,sizeof(buf))) > 0) {
        server.aof_child_diff = sdscatlen(server.aof_child_diff,buf,nread);
        total += nread;
    }
    return total;
}

由于,有客户端可能不断有流量打到父进程,子进程不可能一直等父进程,因此要有一个结束的时刻, Redis 中作了以下决定。

int rewriteAppendOnlyFile(char *filename) {
    ...
    int nodata = 0;
    mstime_t start = mstime();
    while(mstime()-start < 1000 && nodata < 20) {
        /* 在1ms以内,查看从父进程读数据的 fd 是否变成可读的,若不可读则aeWait()函数返回0 */
        if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0)
        {
            nodata++;
            continue;
        }
        // 当管道的读端可读时,清零nodata
        nodata = 0;
        aofReadDiffFromParent();
    }
    ...
}

1ms 超时等待父进程从 pipe 传来数据,若是在 1ms 内有 20 次父进程没传来数据,那么就放弃 ReadDiffFromParent。因为 server.aof_pipe_read_data_from_parent 在初始化时设置为非阻塞,所以 aeWait 调用返回很快。

if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;

接着经过 pipe2 告诉父进程(发特殊符号 !)不要再发来缓存数据了。

还记得前面初始化时,父进程一直在监听 server.aof_pipe_read_ack_from_child 的可读事件吧?当收到 “!” 后,父进程调用处理函数 aofChildPipeReadable

void aofChildPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) {
    char byte;
    if (read(fd,&byte,1) == 1 && byte == '!') {
        serverLog(LL_NOTICE,"AOF rewrite child asks to stop sending diffs.");
        server.aof_stop_sending_diff = 1;
        if (write(server.aof_pipe_write_ack_to_child,"!",1) != 1) {
            serverLog(LL_WARNING,"Can't send ACK to AOF child: %s",
                strerror(errno));
        }
    }
    /* Remove the handler since this can be called only one time during a
     * rewrite. */
    aeDeleteFileEvent(server.el,server.aof_pipe_read_ack_from_child,AE_READABLE);
}

能够看到 server.aof_stop_sending_diff 置为 1,表示再也不给子进程发送缓存数据,接着删除 server.aof_pipe_read_ack_from_child 上可读事件,给子进程回复一个 “!”。

如今回来看子进程的行为。

int rewriteAppendOnlyFile(char *filename) {
    ...
    if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 || byte != '!') 
        goto werr;
    ...
}

子进程阻塞 5s 等待父进程发来确认标记 “!”,以后就开始作本身的收尾工做,以下:

int rewriteAppendOnlyFile(char *filename) {
    ...
    aofReadDiffFromParent(); /* 最后一次从父进程累计写入的缓冲区的差别 */

    /* 将子进程aof_child_diff 中保存的差别数据写到 AOF 文件中 */
    if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0)
        goto werr;

    /* Make sure data will not remain on the OS's output buffers */
    if (fflush(fp) == EOF) goto werr;
    if (fsync(fileno(fp)) == -1) goto werr;
    if (fclose(fp) == EOF) goto werr;

    /* 原子性修改临时文件的名字为 temp-rewriteaof-bg-<pid>.aof */
    if (rename(tmpfile,filename) == -1) {
        unlink(tmpfile);
        return C_ERR;
    }
    ...
}

最后再读取一次 pipe 中的数据,将子进程进行 aofrewrite 期间,aof_child_diff 从父进程累积的数据刷盘,最后进行 rename 系统调用。

通过以上的逻辑处理,server 交给子进程的 aofrewrite 工做就完成了,最终获得一个文件 temp-rewriteaof-bg-<pid>.aof,成功返回 0,不然返回1。

父进程的收尾工做

子进程在执行完 aofrewrite 后退出,父进程 wait3 到子进程的退出状态后,进行 aofrewrite 的收尾工做。在 serverCron 函数里,有以下逻辑,

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    ...
    if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) { /* wait3 等待全部子进程 */
        int exitcode = WEXITSTATUS(statloc);
        int bysignal = 0;

        if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);

        if (pid == -1) {
            serverLog(LL_WARNING,"wait3() returned an error: %s. "
                      "rdb_child_pid = %d, aof_child_pid = %d",
                      strerror(errno),
                      (int) server.rdb_child_pid,
                      (int) server.aof_child_pid);
        } else if (pid == server.rdb_child_pid) {
            backgroundSaveDoneHandler(exitcode,bysignal);
        } else if (pid == server.aof_child_pid) { /* aof 子进程结束 */
            backgroundRewriteDoneHandler(exitcode,bysignal);
        } else {
            if (!ldbRemoveChild(pid)) {
                serverLog(LL_WARNING,
                          "Warning, detected child with unmatched pid: %ld",
                          (long)pid);
            }
        }
        updateDictResizePolicy(); /* 更新 dict resize 为可用状态 */
    }
    ...
}

wait3 函数表示父进程等待全部子进程的返回值, WNOHANG 选项表示没有子进程 exit 时当即返回,man 中对该选项有以下说明, ”WNOHANG return immediately if no child has exited“。

能够看到若是等到 aofwrite 的子进程 exit,那么使用 backgroundRewriteDoneHandler 函数进行处理,主要以下(代码有删减),

void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
    ...
    snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int)server.aof_child_pid);
    newfd = open(tmpfile,O_WRONLY|O_APPEND);
    if (aofRewriteBufferWrite(newfd) == -1) {
        close(newfd);
        goto cleanup;
    }
    ...
}

打开子进程生成的临时文件 temp-rewriteaof-bg-<pid>.aof,调用 aofRewriteBufferWrite,将服务器缓存的剩下的新数据写入该临时文件中,这样该 AOF 临时文件就彻底与当前数据库状态一致了。

那么,下面还有两件事要作,一是将临时 AOF 文件更名,二是切换 fd。

void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
    ...
    if (server.aof_fd == -1) {
        /* AOF disabled */
        oldfd = open(server.aof_filename,O_RDONLY|O_NONBLOCK);
    } else {
        /* AOF enabled */
        oldfd = -1; /* We'll set this to the current AOF filedes later. */
    }
    if (rename(tmpfile,server.aof_filename) == -1) {
        close(newfd);
        if (oldfd != -1) close(oldfd);
        goto cleanup;
    }

    if (server.aof_fd == -1) {
        /* AOF disabled, we don't need to set the AOF file descriptor
         * to this new file, so we can close it. */
        close(newfd);
    } else {
        /* AOF enabled, replace the old fd with the new one. */
        oldfd = server.aof_fd;
        server.aof_fd = newfd;
        if (server.aof_fsync == AOF_FSYNC_ALWAYS)
            aof_fsync(newfd);
        else if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
            aof_background_fsync(newfd);
        server.aof_selected_db = -1; /* Make sure SELECT is re-issued */
        aofUpdateCurrentSize();
        server.aof_rewrite_base_size = server.aof_current_size;

        /* Clear regular AOF buffer since its contents was just written to
         * the new AOF from the background rewrite buffer. */
        sdsfree(server.aof_buf);
        server.aof_buf = sdsempty();
    }
    ...  ...
    /* Asynchronously close the overwritten AOF. */
    if (oldfd != -1) bioCreateBackgroundJob(BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL);
    ...
}

如上,首先将临时 AOF 文件更名,而后就是 oldfd 和 newfd 的处理了,分两种状况

当 AOF 功能关闭时,打开原来的 AOF 文件,得到 oldfd,这里并不关心该操做是不是成功的,若是失败了,那么 oldfd 值为 -1,close(newfd)

当 AOF 功能开启时,oldfd 直接置为 -1,将 aof_fd 切换成 newfd,根据不一样的数据刷盘策略进行 AOF 刷盘,更新相应的参数。

而后是关闭 oldfd 的逻辑,因为 oldfd 多是对旧 AOF 文件的最后一个引用,直接 close 可能会阻塞 server,所以建立后台任务去关闭文件。

最后进行清理工做,以下,

void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
    ...
    cleanup:
        aofClosePipes();
        aofRewriteBufferReset();
        aofRemoveTempFile(server.aof_child_pid);
        server.aof_child_pid = -1;
        server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start;
        server.aof_rewrite_time_start = -1;
        /* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */
        if (server.aof_state == AOF_WAIT_REWRITE)
            server.aof_rewrite_scheduled = 1;
    ...
}

以上, 父进程就完成了收尾工做,写命令就 write 到 newfd 了。

时序图

能够将以上父子进程的交互整理出时序图以下,

上图参考 Redis · 原理介绍 · 利用管道优化aofrewrite

什么时候重写

有两个时刻能够触发 AOF 重写。

【1】手动执行 BGREWRITEAOF 命令。

【2】自动执行,在 serverCron 函数中根据必定逻辑进行断定。

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    ...
          /* Trigger an AOF rewrite if needed */
    if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
        server.aof_rewrite_perc &&
        server.aof_current_size > server.aof_rewrite_min_size) /* 默认 64M */
    {
        long long base = server.aof_rewrite_base_size ? server.aof_rewrite_base_size : 1;
        long long growth = (server.aof_current_size*100/base) - 100;
        if (growth >= server.aof_rewrite_perc) {
            rewriteAppendOnlyFileBackground();
        }
     }
}

也就是说 AOF 文件大小超过了 server.aof_rewrite_min_size,而且增加率大于 server.aof_rewrite_perc 时就会触发,增加率计算的基数 server.aof_rewrite_base_size 是上次 aofrewrite 结束后 AOF 文件的大小。

附录

几个解释。

阻塞模式下,进程或是线程执行到这些函数时必须等待某个事件的发生,若是事件没有发生,进程或线程就被阻塞(死等在被阻塞的地方),函数不会当即返回。

非阻塞non-block模式下,进程或线程执行此函数时没必要非要等待事件的发生,一旦执行确定返回,以返回值的不一样来反映函数的执行状况,若是事件发生则与阻塞方式相同,若事件没有发生则返回一个代码来告知事件未发生,而进程或线程继续执行,因此非阻塞模式效率较高。

相关文章
相关标签/搜索