在前两篇关于 zookeeper
的分析文章中,咱们熟悉了其选举投票及集群间数据同步的过程,本文将针对 zookeeper
启动前的一个重要处理过程进行分析;也便是数据初始化
。java
经过调用追踪咱们发现 zookeeper
最终会执行 ZKDatabase
的方法 loadDataBase
来完成数据初始化数据库
public long loadDataBase() throws IOException {
PlayBackListener listener=new PlayBackListener(){
public void onTxnLoaded(TxnHeader hdr,Record txn){
Request r = new Request(null, 0, hdr.getCxid(),hdr.getType(),
null, null);
r.txn = txn;
r.hdr = hdr;
r.zxid = hdr.getZxid();
// 将事务日志包装成 proposal 集群模式下会同步给follower
addCommittedProposal(r);
}
};
long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener);
initialized = true;
return zxid;
}
复制代码
public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException {
// 快照文件加载至内存
snapLog.deserialize(dt, sessions);
// ......
}
复制代码
下面咱们将针对整个过程进行分析:数组
FileSnap
经过反序列化快照文件,将数据写入内存数据库 DataTree
bash
public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException {
// 查找有效的快照文件
List<File> snapList = findNValidSnapshots(100);
if (snapList.size() == 0) {
return -1L;
}
// 省略
}
复制代码
从 deserialize
实现可知, zookeeper
在处理快照文件时首先须要查找指定个数的有效快照文件; 若不存在快照文件则返回 -1 。session
private List<File> findNValidSnapshots(int n) throws IOException {
// 查找 snapDir 目录下 snapshot 为前缀的快照文件;并按照 zxid 大小降序排列
List<File> files = Util.sortDataDir(snapDir.listFiles(),"snapshot", false);
int count = 0;
List<File> list = new ArrayList<File>();
for (File f : files) {
try {
// 判断当前快照文件是否为有效文件
if (Util.isValidSnapshot(f)) {
list.add(f);
count++;
// 当有效快照文件个数 == n 时退出循环
if (count == n) {
break;
}
}
} catch (IOException e) {
LOG.info("invalid snapshot " + f, e);
}
}
return list;
}
复制代码
public static boolean isValidSnapshot(File f) throws IOException {
if (f==null || Util.getZxidFromName(f.getName(), "snapshot") == -1)
return false;
// Check for a valid snapshot
// 随机读文件
RandomAccessFile raf = new RandomAccessFile(f, "r");
try {
// including the header and the last / bytes
// the snapshot should be atleast 10 bytes
// 文件内容长度至少为 10 字节
if (raf.length() < 10) {
return false;
}
// 指针移动到文件末尾 5 个字节处
raf.seek(raf.length() - 5);
byte bytes[] = new byte[5];
int readlen = 0;
int l;
// 读最后 5 字节至 bytes 字节数组中
while(readlen < 5 &&
(l = raf.read(bytes, readlen, bytes.length - readlen)) >= 0) {
readlen += l;
}
if (readlen != bytes.length) {
LOG.info("Invalid snapshot " + f
+ " too short, len = " + readlen);
return false;
}
ByteBuffer bb = ByteBuffer.wrap(bytes);
int len = bb.getInt();
byte b = bb.get();
// 前 4 字节为整数 1,最后一个字节为 / 则为有效快照文件
if (len != 1 || b != '/') {
LOG.info("Invalid snapshot " + f + " len = " + len
+ " byte = " + (b & 0xff));
return false;
}
} finally {
raf.close();
}
return true;
}
复制代码
从上述代码实现可知,
zookeeper
的snap file
具备如下特性:less
1
+ 字符 /
能够在后续生成快照文件时进行验证
复制代码
public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException {
// 查找有效的快照文件
List<File> snapList = findNValidSnapshots(100);
if (snapList.size() == 0) {
return -1L;
}
File snap = null;
boolean foundValid = false;
for (int i = 0; i < snapList.size(); i++) {
snap = snapList.get(i);
InputStream snapIS = null;
CheckedInputStream crcIn = null;
try {
LOG.info("Reading snapshot " + snap);
snapIS = new BufferedInputStream(new FileInputStream(snap));
crcIn = new CheckedInputStream(snapIS, new Adler32());
// 反序列化文件
InputArchive ia = BinaryInputArchive.getArchive(crcIn);
// 将文件内容写入 dataTree
deserialize(dt,sessions, ia);
// 文件校验和判断
// 若 checkSum 不一致则继续解析下一个快照文件
long checkSum = crcIn.getChecksum().getValue();
long val = ia.readLong("val");
if (val != checkSum) {
throw new IOException("CRC corruption in snapshot : " + snap);
}
// 实际上当最近的一个快照文件为有效文件的时候就会退出循环
foundValid = true;
break;
} catch(IOException e) {
LOG.warn("problem reading snap file " + snap, e);
} finally {
if (snapIS != null)
snapIS.close();
if (crcIn != null)
crcIn.close();
}
}
// 没有一个有效的快照文件,则启动失败
if (!foundValid) {
throw new IOException("Not able to find valid snapshots in " + snapDir);
}
// 记录最近的 zxid
dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), "snapshot");
return dt.lastProcessedZxid;
}
复制代码
zookeeper
经过遍历snap file
并反序列化解析snap file
将其写入内存数据库dataTree
, 在该过程当中会对check sum
进行检查以肯定snap file
的正确性(通常来说当snap file
正确性经过后,只会解析最新的snap file
);若没有一个正确的snap file
则抛出异常说明启动失败,反之dataTree
将会记录lastProcessedZxid
。dom
下面咱们继续对 FileTxSnapLog
的 restore
方法进行分析,zookeeper
在完成快照文件的处理以后,会加载事务日志文件并处理this
public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException {
// 快照文件加载至内存
snapLog.deserialize(dt, sessions);
// 构造 FileTxnLog 实例
FileTxnLog txnLog = new FileTxnLog(dataDir);
// 经过logDir zxid 构造 FileTxnIterator 实例并执行初始化动做
// 加载比快照数据新的日志文件并查找最接近快照的事务日志记录
TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
long highestZxid = dt.lastProcessedZxid;
TxnHeader hdr;
// 省略
}
复制代码
在执行spa
TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
复制代码
时会构建 FileTxnIterator
实例并执行 init
动做,代码以下:指针
public FileTxnIterator(File logDir, long zxid) throws IOException {
this.logDir = logDir;
this.zxid = zxid;
init();
}
复制代码
接下来咱们看下其 init
中如何处理事务日志文件
void init() throws IOException {
storedFiles = new ArrayList<File>();
// 查找 logDir 目录下的全部事务日志记录文件,并按 zxid 降序排列
List<File> files = Util.sortDataDir(FileTxnLog.getLogFiles(logDir.listFiles(), 0), "log", false);
// storedFiles 用来记录 zxid 大于快照文件 zxid 的事务日志文件
for (File f: files) {
if (Util.getZxidFromName(f.getName(), "log") >= zxid) {
storedFiles.add(f);
}
// add the last logfile that is less than the zxid
else if (Util.getZxidFromName(f.getName(), "log") < zxid) {
storedFiles.add(f);
break;
}
}
// 由于一个事务日志文件中记录了多条事务日志,而事务日志文件名的后缀是当前文件的第一条事务记录的zxid
// 经过判断 hdr.getZxid < zxid 查找最接近快照的事务记录
goToNextLog();
if (!next())
return;
while (hdr.getZxid() < zxid) {
if (!next())
return;
}
}
复制代码
从 init
的实现能够看出 zookeeper
对事物日志文件的处理流程以下:
hdr.getZxid() < zxid
查找最接近快照的事务日志记录上文中经过加载事务日志文件查找到快照以后所提交的事务记录,下面就看下如何处理这些事务记录的
public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException {
// 快照文件加载至内存
snapLog.deserialize(dt, sessions);
FileTxnLog txnLog = new FileTxnLog(dataDir);
// 加载比快照数据新的日志文件
TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
long highestZxid = dt.lastProcessedZxid;
TxnHeader hdr;
try {
while (true) {
// iterator points to
// the first valid txn when initialized
hdr = itr.getHeader();
if (hdr == null) {
//empty logs
return dt.lastProcessedZxid;
}
if (hdr.getZxid() < highestZxid && highestZxid != 0) {
LOG.error("{}(higestZxid) > {}(next log) for type {}",
new Object[] { highestZxid, hdr.getZxid(),
hdr.getType() });
} else {
highestZxid = hdr.getZxid();
}
try {
// 处理事务日志 写入内存数据库
processTransaction(hdr,dt,sessions, itr.getTxn());
} catch(KeeperException.NoNodeException e) {
throw new IOException("Failed to process transaction type: " +
hdr.getType() + " error: " + e.getMessage(), e);
}
// listener 回调将事务包装为 proposal 集群模式下同步至 follower
listener.onTxnLoaded(hdr, itr.getTxn());
// 继续下一条事务日志
if (!itr.next())
break;
}
} finally {
if (itr != null) {
itr.close();
}
}
return highestZxid;
}
复制代码
从 restore
的后续实现咱们能够看出, zookeeper
在完成事务日志文件加载以后,会依次处理日志文件中每条事务记录:
listener
回调事务加载事件;将事务包装为 proposal
存储到 committedLog
列表中,并分别记录 maxCommittedLog
,minCommittedLog
(参见上文中的PlayBackListener
)经过上文的分析,能够归纳下 zookeeper
数据初始化的流程以下: