你们都是知道,redis支持两种持久化的操做,AOF和RDB方式。那么为何redis须要支持这两种持久化方式呢?其实redis的做者写过一篇文章很好的说明了为何他要这样来进行设计,原文地址在这里,我也是按照这篇文章的思路结合redis的源码所作的我的总结。若是有不对的地方请指正谢谢。html
首先要说说什么是持久化的操做。redis
持久化就是把内存中的数据写入到断电后数据不会丢失的设备中。一般说的这个设备就是硬盘。数据库
通常来讲一个带有持久化的写操做应该分为以下几个步骤:api
其实前面3步都是应用程序(数据库)本身能保证的,而4和5步是由操做系统来好正的。这里就不得不说下两个函数:缓存
NAME
sync - flush file system buffers
SYNOPSIS
sync [OPTION]
DESCRIPTION
Force changed blocks to disk, update the super block.
复制代码
对于一个文件描述符,其实OS为咱们cache不少写操做 若是咱们要求OS每次写操做都强制write来触发驱动层的话,那么全部的写操做都会变的很是慢。而fsync这个api接口是OS提供的一种折中方案,由应用本身来决定合适触发驱动层写。并且fsync不是针对某个文件描述符,而是针对整个OS。bash
咱们须要分析下面两种状况的持久化问题:服务器
对于redis这种应用程序(数据库服务)来讲,只要步骤3操做成功,即便数据库崩溃,数据也会有内核保证写入到磁盘中; 可是若是整个系统断电这种场景来讲,必需要等到步骤5操做成功,才可认为写操做是真正的持久化成功;
redis是很重视整系统断电时候的性能的。因此redis有一套机制来调用sync系统函数,从而保证数据不要丢失。默认每次write了32M的数据,就回去调用一次fflush和fsync。网络
其实对于一次持久化操做来讲,咱们不光须要考虑怎么不丢数据,还须要好好的考虑下,这个持久化下来的数据文件怎么样来保证文件格式的正确,以便从新启动的时候能够正确加载。
例如,不少No-SQL和SQL类型的数据库都使用树形结构来做为数据存储和索引存储的组织。可是在持久化的时候如何设计一个合理好用的持久化文件的方式,使得这个树形结构在写文件的操做中,执行到一半的时候发生了crash,也能够经过文件从新加载起来呢?
通常来讲有三种常见的策略来进行持久化操做:app
从上面的说明应该看的比较清楚了,redis的做者对于less
RDB方式说白了就是将当前的数据库进行一份快照,而且保存到rdb文件中进行持久化。
这种方式就是第2小节说的方式1,实际上是借助了fork命令的copy on write机制。在生成快照时,将当前进程fork出一个子进程,而后在子进程中循环全部的数据,将数据写成为RDB文件。
RDB的触发分为两种:
a. 经过命令触发
b. 经过条件触发
c. 准备关闭数据库的时候
方式1命令触发: SAVE 和BGSAVE命令。
方式2条件触发: 在redis的conf配置文件中若有下配置:
################################ SNAPSHOTTING ################################
#
# Save the DB on disk:
#
# save <seconds> <changes>
#
# Will save the DB if both the given number of seconds and the given
# number of write operations against the DB occurred.
#
# In the example below the behaviour will be to save:
# after 900 sec (15 min) if at least 1 key changed
# after 300 sec (5 min) if at least 10 keys changed
# after 60 sec if at least 10000 keys changed
#
# Note: you can disable saving completely by commenting out all "save" lines.
#
# It is also possible to remove all the previously configured save
# points by adding a save directive with a single empty string argument
# like in the following example:
#
# save ""
save 900 1
save 300 10
save 60 10000
复制代码
具体的配置在注释里面写的很详细了,其实就是两个条件都知足的时候就会触发一次快照操做。 固然触发也不能过于频繁,redis有一个1ms的定时器,会循环扫描看看当前的条件是否知足,若是知足了,且当前的rdb后台进程没有被触发,则开始fork进程而且dump文件。也就是说同一时刻,只会有一个RDB 后台进程在dump数据哦。固然建立了快照后,开始dump的过程当中确定会有其余的写操做进来,这时候的这部分数据在本次save操做中是没办法保存下来的。
方式3 shutdown的时候会触发,只须要在conf配置文件中打开了save开关,就会尝试去写入一次RDB文件。
### 3.3 写RDB文件的触发机制
读取文件流程发生的场景:
- 经过debug命令加载: 经过命令```debug reload ``` 触发redis从新加载;
- redis服务启动的时候,触发加载;
### 3.3 源码分析
#### 3.3.1 save的源码分析
复制代码
void saveCommand(client *c) { if (server.rdb_child_pid != -1) { //若是已经在进行save,再也不save addReplyError(c,"Background save already in progress"); return; } rdbSaveInfo rsi, *rsiptr; rsiptr = rdbPopulateSaveInfo(&rsi); //生成必要的save信息 if (rdbSave(server.rdb_filename,rsiptr) == C_OK) { //在这个函数开始真正的写文件 addReply(c,shared.ok); } else { addReply(c,shared.err); } }
在上面的```rdbPopulateSaveInfo```函数中会生成一个rdbSaveInfo的结构指针,接着调用```rdbSave```函数进行真正的存储数据。
rdbSave函数实现:
```C
/* Save the DB on disk. Return C_ERR on error, C_OK on success. */
int rdbSave(char *filename, rdbSaveInfo *rsi) {
char tmpfile[256];
char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
FILE *fp;
rio rdb;
int error = 0;
snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
fp = fopen(tmpfile,"w"); //打开一个临时文件
if (!fp) {
char *cwdp = getcwd(cwd,MAXPATHLEN);
serverLog(LL_WARNING,
"Failed opening the RDB file %s (in server root dir %s) "
"for saving: %s",
filename,
cwdp ? cwdp : "unknown",
strerror(errno));
return C_ERR;
}
rioInitWithFile(&rdb,fp); //封装一个rio结构
if (server.rdb_save_incremental_fsync)
rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES); //若是打开了增量同步开关,每32M进行一次flush操做,在后面的流程看到这个选项的做用
//向文件中写
if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) {
errno = error;
goto werr;
}
/* Make sure data will not remain on the OS's output buffers */ if (fflush(fp) == EOF) goto werr; if (fsync(fileno(fp)) == -1) goto werr; if (fclose(fp) == EOF) goto werr; /* Use RENAME to make sure the DB file is changed atomically only * if the generate DB file is ok. */ if (rename(tmpfile,filename) == -1) { //重命名文件,原来的rdb文件被覆盖 char *cwdp = getcwd(cwd,MAXPATHLEN); serverLog(LL_WARNING, "Error moving temp DB file %s on the final " "destination %s (in server root dir %s): %s", tmpfile, filename, cwdp ? cwdp : "unknown", strerror(errno)); unlink(tmpfile); return C_ERR; } serverLog(LL_NOTICE,"DB saved on disk"); server.dirty = 0; server.lastsave = time(NULL); server.lastbgsave_status = C_OK; return C_OK; 复制代码
从上面的代码分析看,redis会建立一个文件描述符fp
,而且封装到一个叫作rio rdb
的结构中。这个结构有什么秘密呢?咱们来一块儿看下:
struct _rio {
/* Backend functions. * Since this functions do not tolerate short writes or reads the return * value is simplified to: zero on error, non zero on complete success. */
size_t (*read)(struct _rio *, void *buf, size_t len);
size_t (*write)(struct _rio *, const void *buf, size_t len);
off_t (*tell)(struct _rio *);
int (*flush)(struct _rio *);
/* The update_cksum method if not NULL is used to compute the checksum of * all the data that was read or written so far. The method should be * designed so that can be called with the current checksum, and the buf * and len fields pointing to the new block of data to add to the checksum * computation. */
void (*update_cksum)(struct _rio *, const void *buf, size_t len);
/* The current checksum */
uint64_t cksum;
/* number of bytes read or written */
size_t processed_bytes;
/* maximum single read or write chunk size */
size_t max_processing_chunk;
/* Backend-specific vars. */
union {
/* In-memory buffer target. */
struct {
sds ptr;
off_t pos;
} buffer;
/* Stdio file pointer target. */
struct {
FILE *fp;
off_t buffered; /* Bytes written since last fsync. */
off_t autosync; /* fsync after 'autosync' bytes written. */
} file;
/* Multiple FDs target (used to write to N sockets). */
struct {
int *fds; /* File descriptors. */
int *state; /* Error state of each fd. 0 (if ok) or errno. */
int numfds;
off_t pos;
sds buf;
} fdset;
} io;
};
复制代码
从结构体的注释上就能很清楚的看出来,redis的做者认为: 对于小数据的读写操做,read
write
tell
flush
这几个api接口函数的效率是很低的。因此这里作了对应的封装。 具体的读写操做,redis也从新作了封装,具体的封装函数在rio.h 和rio.c里面。
/* The following functions are our interface with the stream. They'll call the * actual implementation of read / write / tell, and will update the checksum * if needed. */
static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
while (len) {
size_t bytes_to_write = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
if (r->update_cksum) r->update_cksum(r,buf,bytes_to_write);
if (r->write(r,buf,bytes_to_write) == 0)
return 0;
buf = (char*)buf + bytes_to_write;
len -= bytes_to_write;
r->processed_bytes += bytes_to_write;
}
return 1;
}
复制代码
上面的函数看上去很是简单,就是去调用 r->write(r,buf,bytes_to_write)
发生一次write操做。具体的实际上是调用了下面的函数:
/* Returns 1 or 0 for success/failure. */
static size_t rioFileWrite(rio *r, const void *buf, size_t len) {
size_t retval;
//直接调用fwrite ANSI标准C函数去write
retval = fwrite(buf,len,1,r->io.file.fp);
r->io.file.buffered += len;
//可是是否当即flush的时候作了判断,若是达到了前面设置的阈值(默认32M)
//才会调用flush和fsync,要求当即写入硬盘
if (r->io.file.autosync &&
r->io.file.buffered >= r->io.file.autosync)
{
fflush(r->io.file.fp);
redis_fsync(fileno(r->io.file.fp));
r->io.file.buffered = 0;
}
return retval;
}
复制代码
看出来是有条件的调用了fflush
和redis_fsync
。
而若是使用BGSAVE命令,源码:
void bgsaveCommand(client *c) {
int schedule = 0;
/* The SCHEDULE option changes the behavior of BGSAVE when an AOF rewrite * is in progress. Instead of returning an error a BGSAVE gets scheduled. */
if (c->argc > 1) {
if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"schedule")) {
schedule = 1;
} else {
addReply(c,shared.syntaxerr);
return;
}
}
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
if (server.rdb_child_pid != -1) {//正在rdb操做,本次操做不能够进行
addReplyError(c,"Background save already in progress");
} else if (server.aof_child_pid != -1) { //正在aof操做本次操做不能够进行
if (schedule) {
server.rdb_bgsave_scheduled = 1;
addReplyStatus(c,"Background saving scheduled");
} else {
addReplyError(c,
"An AOF log rewriting in progress: can't BGSAVE right now. "
"Use BGSAVE SCHEDULE in order to schedule a BGSAVE whenever "
"possible.");
}
} else if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK) {
addReplyStatus(c,"Background saving started");
} else {
addReply(c,shared.err);
}
}
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
pid_t childpid;
long long start;
if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
server.dirty_before_bgsave = server.dirty;
server.lastbgsave_try = time(NULL);
openChildInfoPipe();
start = ustime();
if ((childpid = fork()) == 0) { //在这里去fork一个新的进程进行持久化
int retval;
/* Child */
closeListeningSockets(0);
redisSetProcTitle("redis-rdb-bgsave");
retval = rdbSave(filename,rsi);
if (retval == C_OK) {
size_t private_dirty = zmalloc_get_private_dirty(-1);
if (private_dirty) {
serverLog(LL_NOTICE,
"RDB: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}
server.child_info_data.cow_size = private_dirty;
sendChildInfo(CHILD_INFO_TYPE_RDB);
}
exitFromChild((retval == C_OK) ? 0 : 1);
} else {
/* Parent */
server.stat_fork_time = ustime()-start;
server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
if (childpid == -1) {
closeChildInfoPipe();
server.lastbgsave_status = C_ERR;
serverLog(LL_WARNING,"Can't save in background: fork: %s",
strerror(errno));
return C_ERR;
}
serverLog(LL_NOTICE,"Background saving started by pid %d",childpid);
server.rdb_save_time_start = time(NULL);
server.rdb_child_pid = childpid;
server.rdb_child_type = RDB_CHILD_TYPE_DISK;
updateDictResizePolicy();
return C_OK;
}
return C_OK; /* unreached */
}
复制代码
经过上面的代码能够看出,bgsave命令是经过fork命令来进行后台的备份,而save命令会在原有的进程上进行操做,因此save命令要慎用,一旦使用了,若是redis中存储的数据比较多,所有dump到磁盘上,会使得redis服务器卡住一会。
在rdbsave
函数中,会建立一个文件描述符fp,并使用rioInitWithFile
函数进行出初始化为一个rio rdb
对象。这个函数里面会把redis本身封装的read
,write
,tell
,flush
的函数指针赋值到rdb对象中。真正的写文件函数在下面的这个rdbSaveRio
函数中实现。
/* Produces a dump of the database in RDB format sending it to the specified * Redis I/O channel. On success C_OK is returned, otherwise C_ERR * is returned and part of the output, or all the output, can be * missing because of I/O errors. * * When the function returns C_ERR and if 'error' is not NULL, the * integer pointed by 'error' is set to the value of errno just after the I/O * error. */
int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
dictIterator *di = NULL;
dictEntry *de;
char magic[10];
int j;
uint64_t cksum;
size_t processed = 0;
if (server.rdb_checksum)
rdb->update_cksum = rioGenericUpdateChecksum;
snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION); //文件的头是一个魔数
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;
//开始遍历数据库,一个个的开始进行持久化
for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j;
dict *d = db->dict;
if (dictSize(d) == 0) continue;
di = dictGetSafeIterator(d);
/* Write the SELECT DB opcode */
if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;
if (rdbSaveLen(rdb,j) == -1) goto werr;
/* Write the RESIZE DB opcode. We trim the size to UINT32_MAX, which * is currently the largest type we are able to represent in RDB sizes. * However this does not limit the actual size of the DB to load since * these sizes are just hints to resize the hash tables. */
uint64_t db_size, expires_size;
db_size = dictSize(db->dict);
expires_size = dictSize(db->expires);
if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
if (rdbSaveLen(rdb,db_size) == -1) goto werr;
if (rdbSaveLen(rdb,expires_size) == -1) goto werr;
/* Iterate this DB writing every entry */
while((de = dictNext(di)) != NULL) {
sds keystr = dictGetKey(de);
robj key, *o = dictGetVal(de);
long long expire;
initStaticStringObject(key,keystr);
expire = getExpire(db,&key);
if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;
/* When this RDB is produced as part of an AOF rewrite, move * accumulated diff from parent to child while rewriting in * order to have a smaller final write. */
if (flags & RDB_SAVE_AOF_PREAMBLE &&
rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
{
processed = rdb->processed_bytes;
aofReadDiffFromParent();
}
}
dictReleaseIterator(di);
di = NULL; /* So that we don't release it again on error. */
}
/* If we are storing the replication information on disk, persist * the script cache as well: on successful PSYNC after a restart, we need * to be able to process any EVALSHA inside the replication backlog the * master will send us. */
if (rsi && dictSize(server.lua_scripts)) {
di = dictGetIterator(server.lua_scripts);
while((de = dictNext(di)) != NULL) {
robj *body = dictGetVal(de);
if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1)
goto werr;
}
dictReleaseIterator(di);
di = NULL; /* So that we don't release it again on error. */
}
/* EOF opcode */
if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;
/* CRC64 checksum. It will be zero if checksum computation is disabled, the * loading code skips the check in this case. */
cksum = rdb->cksum;
memrev64ifbe(&cksum);
if (rioWrite(rdb,&cksum,8) == 0) goto werr;
return C_OK;
werr:
if (error) *error = errno;
if (di) dictReleaseIterator(di);
return C_ERR;
}
复制代码
一个rdb文件的头是一个9个字节的魔数,整体以下格式:
魔数字后面的几个字段叫作AUX,存储的是redis的版本号,建立时间,已经使用的内存等等信息。 存储的格式以下:
其中opcode是redis本身内部定义的一个操做码,用于表示这是一个什么类型的字段。具体定义以下:
/* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */
#define RDB_OPCODE_MODULE_AUX 247 /* Module auxiliary data. */
#define RDB_OPCODE_IDLE 248 /* LRU idle time. */
#define RDB_OPCODE_FREQ 249 /* LFU frequency. */
#define RDB_OPCODE_AUX 250 /* RDB aux field. */
#define RDB_OPCODE_RESIZEDB 251 /* Hash table resize hint. */
#define RDB_OPCODE_EXPIRETIME_MS 252 /* Expire time in milliseconds. */
#define RDB_OPCODE_EXPIRETIME 253 /* Old expire time in seconds. */
#define RDB_OPCODE_SELECTDB 254 /* DB number of the following keys. */
#define RDB_OPCODE_EOF 255 /* End of the RDB file. */
复制代码
具体的AuxFields字段的填写代码实现以下:
/* Save a few default AUX fields with information about the RDB generated. */
int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) {
int redis_bits = (sizeof(void*) == 8) ? 64 : 32;
int aof_preamble = (flags & RDB_SAVE_AOF_PREAMBLE) != 0;
/* Add a few fields about the state when the RDB was created. */
if (rdbSaveAuxFieldStrStr(rdb,"redis-ver",REDIS_VERSION) == -1) return -1;
if (rdbSaveAuxFieldStrInt(rdb,"redis-bits",redis_bits) == -1) return -1;
if (rdbSaveAuxFieldStrInt(rdb,"ctime",time(NULL)) == -1) return -1;
if (rdbSaveAuxFieldStrInt(rdb,"used-mem",zmalloc_used_memory()) == -1) return -1;
/* Handle saving options that generate aux fields. */
if (rsi) {
if (rdbSaveAuxFieldStrInt(rdb,"repl-stream-db",rsi->repl_stream_db)
== -1) return -1;
if (rdbSaveAuxFieldStrStr(rdb,"repl-id",server.replid)
== -1) return -1;
if (rdbSaveAuxFieldStrInt(rdb,"repl-offset",server.master_repl_offset)
== -1) return -1;
}
if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble) == -1) return -1;
return 1;
}
/* Wrapper for rdbSaveAuxField() used when key/val length can be obtained
* with strlen(). */
ssize_t rdbSaveAuxFieldStrStr(rio *rdb, char *key, char *val) {
return rdbSaveAuxField(rdb,key,strlen(key),val,strlen(val));
}
/* Wrapper for strlen(key) + integer type (up to long long range). */
ssize_t rdbSaveAuxFieldStrInt(rio *rdb, char *key, long long val) {
char buf[LONG_STR_SIZE];
int vlen = ll2string(buf,sizeof(buf),val);
return rdbSaveAuxField(rdb,key,strlen(key),buf,vlen);
}
/* Save an AUX field. */
ssize_t rdbSaveAuxField(rio *rdb, void *key, size_t keylen, void *val, size_t vallen) {
ssize_t ret, len = 0;
if ((ret = rdbSaveType(rdb,RDB_OPCODE_AUX)) == -1) return -1;
len += ret;
if ((ret = rdbSaveRawString(rdb,key,keylen)) == -1) return -1;
len += ret;
if ((ret = rdbSaveRawString(rdb,val,vallen)) == -1) return -1;
len += ret;
return len;
}
复制代码
在下面的循环中,redis会遍历当前server中全部的db。而后把db中全部的dict都给保存下来,根据不一样的格式,保存到文件的方式也是不同的。 具体的代码以下:
/* Iterate this DB writing every entry */
while((de = dictNext(di)) != NULL) {
sds keystr = dictGetKey(de);
robj key, *o = dictGetVal(de);
long long expire;
initStaticStringObject(key,keystr);
expire = getExpire(db,&key);
if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;
/* When this RDB is produced as part of an AOF rewrite, move * accumulated diff from parent to child while rewriting in * order to have a smaller final write. */
if (flags & RDB_SAVE_AOF_PREAMBLE &&
rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
{
processed = rdb->processed_bytes;
aofReadDiffFromParent();
}
}
dictReleaseIterator(di);
di = NULL; /* So that we don't release it again on error. */
}
/* Save a key-value pair, with expire time, type, key, value. * On error -1 is returned. * On success if the key was actually saved 1 is returned, otherwise 0 * is returned (the key was already expired). */
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) {
int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU;
int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU;
/* Save the expire time */
if (expiretime != -1) {
if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
}
/* Save the LRU info. */
if (savelru) {
uint64_t idletime = estimateObjectIdleTime(val);
idletime /= 1000; /* Using seconds is enough and requires less space.*/
if (rdbSaveType(rdb,RDB_OPCODE_IDLE) == -1) return -1;
if (rdbSaveLen(rdb,idletime) == -1) return -1;
}
/* Save the LFU info. */
if (savelfu) {
uint8_t buf[1];
buf[0] = LFUDecrAndReturn(val);
/* We can encode this in exactly two bytes: the opcode and an 8 * bit counter, since the frequency is logarithmic with a 0-255 range. * Note that we do not store the halving time because to reset it * a single time when loading does not affect the frequency much. */
if (rdbSaveType(rdb,RDB_OPCODE_FREQ) == -1) return -1;
if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
}
/* Save type, key, value */
if (rdbSaveObjectType(rdb,val) == -1) return -1;
if (rdbSaveStringObject(rdb,key) == -1) return -1;
if (rdbSaveObject(rdb,val) == -1) return -1;
return 1;
/* Save the object type of object "o". */
int rdbSaveObjectType(rio *rdb, robj *o) {
switch (o->type) {
case OBJ_STRING:
return rdbSaveType(rdb,RDB_TYPE_STRING);
case OBJ_LIST:
if (o->encoding == OBJ_ENCODING_QUICKLIST)
return rdbSaveType(rdb,RDB_TYPE_LIST_QUICKLIST);
else
serverPanic("Unknown list encoding");
case OBJ_SET:
if (o->encoding == OBJ_ENCODING_INTSET)
return rdbSaveType(rdb,RDB_TYPE_SET_INTSET);
else if (o->encoding == OBJ_ENCODING_HT)
return rdbSaveType(rdb,RDB_TYPE_SET);
else
serverPanic("Unknown set encoding");
case OBJ_ZSET:
if (o->encoding == OBJ_ENCODING_ZIPLIST)
return rdbSaveType(rdb,RDB_TYPE_ZSET_ZIPLIST);
else if (o->encoding == OBJ_ENCODING_SKIPLIST)
return rdbSaveType(rdb,RDB_TYPE_ZSET_2);
else
serverPanic("Unknown sorted set encoding");
case OBJ_HASH:
if (o->encoding == OBJ_ENCODING_ZIPLIST)
return rdbSaveType(rdb,RDB_TYPE_HASH_ZIPLIST);
else if (o->encoding == OBJ_ENCODING_HT)
return rdbSaveType(rdb,RDB_TYPE_HASH);
else
serverPanic("Unknown hash encoding");
case OBJ_STREAM:
return rdbSaveType(rdb,RDB_TYPE_STREAM_LISTPACKS);
case OBJ_MODULE:
return rdbSaveType(rdb,RDB_TYPE_MODULE_2);
default:
serverPanic("Unknown object type");
}
return -1; /* avoid warning */
}
}
复制代码
在服务启动的时候,若是conf配置文件中配置了save,那么redis就会去尝试加载一次rdb文件。源码:
/* Function called at startup to load RDB or AOF file in memory. */
void loadDataFromDisk(void) {
long long start = ustime();
if (server.aof_state == AOF_ON) {
if (loadAppendOnlyFile(server.aof_filename) == C_OK)
serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
} else {
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
if (rdbLoad(server.rdb_filename,&rsi) == C_OK) {
serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds",
(float)(ustime()-start)/1000000);
/* Restore the replication ID / offset from the RDB file. */
if (server.masterhost &&
rsi.repl_id_is_set &&
rsi.repl_offset != -1 &&
/* Note that older implementations may save a repl_stream_db * of -1 inside the RDB file in a wrong way, see more information * in function rdbPopulateSaveInfo. */
rsi.repl_stream_db != -1)
{
memcpy(server.replid,rsi.repl_id,sizeof(server.replid));
server.master_repl_offset = rsi.repl_offset;
/* If we are a slave, create a cached master from this * information, in order to allow partial resynchronizations * with masters. */
replicationCacheMasterUsingMyself();
selectDb(server.cached_master,rsi.repl_stream_db);
}
} else if (errno != ENOENT) {
serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));
exit(1);
}
}
}
复制代码
具体调用了rdbLoad
函数进行加载,这个函数实现:
int rdbLoad(char *filename, rdbSaveInfo *rsi) {
FILE *fp;
rio rdb;
int retval;
if ((fp = fopen(filename,"r")) == NULL) return C_ERR;
startLoading(fp);
rioInitWithFile(&rdb,fp);
retval = rdbLoadRio(&rdb,rsi,0);
fclose(fp);
stopLoading();
return retval;
}
复制代码
看到和read的流程同样,也是先把文件描述符封装为rio对象,而后调用rdbLoadRio
操做:
/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned, * otherwise C_ERR is returned and 'errno' is set accordingly. */
int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
uint64_t dbid;
int type, rdbver;
redisDb *db = server.db+0;
char buf[1024];
rdb->update_cksum = rdbLoadProgressCallback;
rdb->max_processing_chunk = server.loading_process_events_interval_bytes;
if (rioRead(rdb,buf,9) == 0) goto eoferr;
buf[9] = '\0';
if (memcmp(buf,"REDIS",5) != 0) {
serverLog(LL_WARNING,"Wrong signature trying to load DB from file");
errno = EINVAL;
return C_ERR;
}
rdbver = atoi(buf+5);
if (rdbver < 1 || rdbver > RDB_VERSION) {
serverLog(LL_WARNING,"Can't handle RDB format version %d",rdbver);
errno = EINVAL;
return C_ERR;
}
/* Key-specific attributes, set by opcodes before the key type. */
long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime();
long long lru_clock = LRU_CLOCK();
while(1) {
robj *key, *val;
/* Read type. */
if ((type = rdbLoadType(rdb)) == -1) goto eoferr;
/* Handle special types. */
if (type == RDB_OPCODE_EXPIRETIME) {
/* EXPIRETIME: load an expire associated with the next key * to load. Note that after loading an expire we need to * load the actual type, and continue. */
expiretime = rdbLoadTime(rdb);
expiretime *= 1000;
continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_EXPIRETIME_MS) {
/* EXPIRETIME_MS: milliseconds precision expire times introduced * with RDB v3. Like EXPIRETIME but no with more precision. */
expiretime = rdbLoadMillisecondTime(rdb,rdbver);
continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_FREQ) {
/* FREQ: LFU frequency. */
uint8_t byte;
if (rioRead(rdb,&byte,1) == 0) goto eoferr;
lfu_freq = byte;
continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_IDLE) {
/* IDLE: LRU idle time. */
uint64_t qword;
if ((qword = rdbLoadLen(rdb,NULL)) == RDB_LENERR) goto eoferr;
lru_idle = qword;
continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_EOF) {
/* EOF: End of file, exit the main loop. */
break;
} else if (type == RDB_OPCODE_SELECTDB) {
/* SELECTDB: Select the specified database. */
if ((dbid = rdbLoadLen(rdb,NULL)) == RDB_LENERR) goto eoferr;
if (dbid >= (unsigned)server.dbnum) {
serverLog(LL_WARNING,
"FATAL: Data file was created with a Redis "
"server configured to handle more than %d "
"databases. Exiting\n", server.dbnum);
exit(1);
}
db = server.db+dbid;
continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_RESIZEDB) {
/* RESIZEDB: Hint about the size of the keys in the currently * selected data base, in order to avoid useless rehashing. */
uint64_t db_size, expires_size;
if ((db_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR)
goto eoferr;
if ((expires_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR)
goto eoferr;
dictExpand(db->dict,db_size);
dictExpand(db->expires,expires_size);
continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_AUX) {
/* AUX: generic string-string fields. Use to add state to RDB * which is backward compatible. Implementations of RDB loading * are requierd to skip AUX fields they don't understand. * * An AUX field is composed of two strings: key and value. */
robj *auxkey, *auxval;
if ((auxkey = rdbLoadStringObject(rdb)) == NULL) goto eoferr;
if ((auxval = rdbLoadStringObject(rdb)) == NULL) goto eoferr;
if (((char*)auxkey->ptr)[0] == '%') {
/* All the fields with a name staring with '%' are considered * information fields and are logged at startup with a log * level of NOTICE. */
serverLog(LL_NOTICE,"RDB '%s': %s",
(char*)auxkey->ptr,
(char*)auxval->ptr);
} else if (!strcasecmp(auxkey->ptr,"repl-stream-db")) {
if (rsi) rsi->repl_stream_db = atoi(auxval->ptr);
} else if (!strcasecmp(auxkey->ptr,"repl-id")) {
if (rsi && sdslen(auxval->ptr) == CONFIG_RUN_ID_SIZE) {
memcpy(rsi->repl_id,auxval->ptr,CONFIG_RUN_ID_SIZE+1);
rsi->repl_id_is_set = 1;
}
} else if (!strcasecmp(auxkey->ptr,"repl-offset")) {
if (rsi) rsi->repl_offset = strtoll(auxval->ptr,NULL,10);
} else if (!strcasecmp(auxkey->ptr,"lua")) {
/* Load the script back in memory. */
if (luaCreateFunction(NULL,server.lua,auxval) == NULL) {
rdbExitReportCorruptRDB(
"Can't load Lua script from RDB file! "
"BODY: %s", auxval->ptr);
}
} else {
/* We ignore fields we don't understand, as by AUX field * contract. */
serverLog(LL_DEBUG,"Unrecognized RDB AUX field: '%s'",
(char*)auxkey->ptr);
}
decrRefCount(auxkey);
decrRefCount(auxval);
continue; /* Read type again. */
} else if (type == RDB_OPCODE_MODULE_AUX) {
/* This is just for compatibility with the future: we have plans * to add the ability for modules to store anything in the RDB * file, like data that is not related to the Redis key space. * Such data will potentially be stored both before and after the * RDB keys-values section. For this reason since RDB version 9, * we have the ability to read a MODULE_AUX opcode followed by an * identifier of the module, and a serialized value in "MODULE V2" * format. */
uint64_t moduleid = rdbLoadLen(rdb,NULL);
moduleType *mt = moduleTypeLookupModuleByID(moduleid);
char name[10];
moduleTypeNameByID(name,moduleid);
if (!rdbCheckMode && mt == NULL) {
/* Unknown module. */
serverLog(LL_WARNING,"The RDB file contains AUX module data I can't load: no matching module '%s'", name);
exit(1);
} else if (!rdbCheckMode && mt != NULL) {
/* This version of Redis actually does not know what to do * with modules AUX data... */
serverLog(LL_WARNING,"The RDB file contains AUX module data I can't load for the module '%s'. Probably you want to use a newer version of Redis which implements aux data callbacks", name);
exit(1);
} else {
/* RDB check mode. */
robj *aux = rdbLoadCheckModuleValue(rdb,name);
decrRefCount(aux);
}
}
/* Read key */
if ((key = rdbLoadStringObject(rdb)) == NULL) goto eoferr;
/* Read value */
if ((val = rdbLoadObject(type,rdb)) == NULL) goto eoferr;
/* Check if the key already expired. This function is used when loading * an RDB file from disk, either at startup, or when an RDB was * received from the master. In the latter case, the master is * responsible for key expiry. If we would expire keys here, the * snapshot taken by the master may not be reflected on the slave. */
if (server.masterhost == NULL && !loading_aof && expiretime != -1 && expiretime < now) {
decrRefCount(key);
decrRefCount(val);
} else {
/* Add the new object in the hash table */
dbAdd(db,key,val);
/* Set the expire time if needed */
if (expiretime != -1) setExpire(NULL,db,key,expiretime);
/* Set usage information (for eviction). */
objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock);
/* Decrement the key refcount since dbAdd() will take its * own reference. */
decrRefCount(key);
}
/* Reset the state that is key-specified and is populated by * opcodes before the key, so that we start from scratch again. */
expiretime = -1;
lfu_freq = -1;
lru_idle = -1;
}
/* Verify the checksum if RDB version is >= 5 */
if (rdbver >= 5) {
uint64_t cksum, expected = rdb->cksum;
if (rioRead(rdb,&cksum,8) == 0) goto eoferr;
if (server.rdb_checksum) {
memrev64ifbe(&cksum);
if (cksum == 0) {
serverLog(LL_WARNING,"RDB file was saved with checksum disabled: no check performed.");
} else if (cksum != expected) {
serverLog(LL_WARNING,"Wrong RDB checksum. Aborting now.");
rdbExitReportCorruptRDB("RDB CRC error");
}
}
}
return C_OK;
eoferr: /* unexpected end of file is handled here with a fatal exit */
serverLog(LL_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
rdbExitReportCorruptRDB("Unexpected EOF reading RDB file");
return C_ERR; /* Just to avoid warning */
}
复制代码
经过上面的源码分析看出来RDB方式有以下优缺点
优势:
1.经过fork命令建立后台子进程来进行异步操做,全量保存到文件中;
缺点: 1.若是使用save命令来保存,是同步操做,由于redis是单线程,致使工做线程卡住,redis卡顿; 2.在fork进程后,若是再有客户端发起写操做,会致使这部分数据没有及时的同步到RDB文件中,只有等此次RDB的dump操做结束后,下一次操做的时候持久化。 为了解决这个问题,redis就引入了另一种持久化方式----AOF。