zookeeper-数据初始化分析

概述

在前两篇关于 zookeeper 的分析文章中,咱们熟悉了其选举投票及集群间数据同步的过程,本文将针对 zookeeper 启动前的一个重要处理过程进行分析;也便是数据初始化java

经过调用追踪咱们发现 zookeeper 最终会执行 ZKDatabase 的方法 loadDataBase 来完成数据初始化数据库

public long loadDataBase() throws IOException {
    PlayBackListener listener=new PlayBackListener(){
        public void onTxnLoaded(TxnHeader hdr,Record txn){
            Request r = new Request(null, 0, hdr.getCxid(),hdr.getType(),
                    null, null);
            r.txn = txn;
            r.hdr = hdr;
            r.zxid = hdr.getZxid();
            // 将事务日志包装成 proposal 集群模式下会同步给follower
            addCommittedProposal(r);
        }
    };
    
    long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener);
    initialized = true;
    return zxid;
}
复制代码
public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException {
    // 快照文件加载至内存
    snapLog.deserialize(dt, sessions);
    
    // ......
}
复制代码

下面咱们将针对整个过程进行分析:数组

加载快照文件

FileSnap 经过反序列化快照文件,将数据写入内存数据库 DataTreebash

public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException {
    // 查找有效的快照文件
    List<File> snapList = findNValidSnapshots(100);
    if (snapList.size() == 0) {
        return -1L;
    }
    
    // 省略
}
复制代码

deserialize 实现可知, zookeeper 在处理快照文件时首先须要查找指定个数的有效快照文件; 若不存在快照文件则返回 -1 。session

查找有效的快照文件

private List<File> findNValidSnapshots(int n) throws IOException {
    // 查找 snapDir 目录下 snapshot 为前缀的快照文件;并按照 zxid 大小降序排列
    List<File> files = Util.sortDataDir(snapDir.listFiles(),"snapshot", false);
    int count = 0;
    List<File> list = new ArrayList<File>();
    for (File f : files) {
        try {
            // 判断当前快照文件是否为有效文件
            if (Util.isValidSnapshot(f)) {
                list.add(f);
                count++;
                // 当有效快照文件个数 == n 时退出循环
                if (count == n) {
                    break;
                }
            }
        } catch (IOException e) {
            LOG.info("invalid snapshot " + f, e);
        }
    }
    return list;
}
复制代码
public static boolean isValidSnapshot(File f) throws IOException {
    if (f==null || Util.getZxidFromName(f.getName(), "snapshot") == -1)
        return false;

    // Check for a valid snapshot
    // 随机读文件
    RandomAccessFile raf = new RandomAccessFile(f, "r");
    try {
        // including the header and the last / bytes
        // the snapshot should be atleast 10 bytes
    	// 文件内容长度至少为 10 字节
        if (raf.length() < 10) {
            return false;
        }
        // 指针移动到文件末尾 5 个字节处
        raf.seek(raf.length() - 5);
        byte bytes[] = new byte[5];
        int readlen = 0;
        int l;
        // 读最后 5 字节至 bytes 字节数组中
        while(readlen < 5 &&
              (l = raf.read(bytes, readlen, bytes.length - readlen)) >= 0) {
            readlen += l;
        }
        if (readlen != bytes.length) {
            LOG.info("Invalid snapshot " + f
                    + " too short, len = " + readlen);
            return false;
        }
        ByteBuffer bb = ByteBuffer.wrap(bytes);
        int len = bb.getInt();
        byte b = bb.get();
        // 前 4 字节为整数 1,最后一个字节为 / 则为有效快照文件
        if (len != 1 || b != '/') {
            LOG.info("Invalid snapshot " + f + " len = " + len
                    + " byte = " + (b & 0xff));
            return false;
        }
    } finally {
        raf.close();
    }

    return true;
}
复制代码

从上述代码实现可知,zookeepersnap file 具备如下特性:less

  • 快照文件内容长度至少为 10 字节
  • 快照文件末尾 5 个字节内容为: 整形数字 1 + 字符 /
能够在后续生成快照文件时进行验证
复制代码

解析快照文件

public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException {
    // 查找有效的快照文件
    List<File> snapList = findNValidSnapshots(100);
    if (snapList.size() == 0) {
        return -1L;
    }
    File snap = null;
    boolean foundValid = false;
    for (int i = 0; i < snapList.size(); i++) {
        snap = snapList.get(i);
        InputStream snapIS = null;
        CheckedInputStream crcIn = null;
        try {
            LOG.info("Reading snapshot " + snap);
            snapIS = new BufferedInputStream(new FileInputStream(snap));
            crcIn = new CheckedInputStream(snapIS, new Adler32());
            // 反序列化文件
            InputArchive ia = BinaryInputArchive.getArchive(crcIn);
            // 将文件内容写入 dataTree
            deserialize(dt,sessions, ia);
            // 文件校验和判断
            // 若 checkSum 不一致则继续解析下一个快照文件
            long checkSum = crcIn.getChecksum().getValue();
            long val = ia.readLong("val");
            if (val != checkSum) {
                throw new IOException("CRC corruption in snapshot : " + snap);
            }

            // 实际上当最近的一个快照文件为有效文件的时候就会退出循环
            foundValid = true;
            break;
        } catch(IOException e) {
            LOG.warn("problem reading snap file " + snap, e);
        } finally {
            if (snapIS != null) 
                snapIS.close();
            if (crcIn != null) 
                crcIn.close();
        } 
    }

    // 没有一个有效的快照文件,则启动失败
    if (!foundValid) {
        throw new IOException("Not able to find valid snapshots in " + snapDir);
    }
    // 记录最近的 zxid
    dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), "snapshot");
    return dt.lastProcessedZxid;
}
复制代码

zookeeper 经过遍历 snap file 并反序列化解析 snap file 将其写入内存数据库 dataTree, 在该过程当中会对 check sum 进行检查以肯定 snap file 的正确性(通常来说当 snap file 正确性经过后,只会解析最新的 snap file);若没有一个正确的 snap file 则抛出异常说明启动失败,反之 dataTree 将会记录 lastProcessedZxiddom

加载事务日志文件

下面咱们继续对 FileTxSnapLogrestore 方法进行分析,zookeeper 在完成快照文件的处理以后,会加载事务日志文件并处理this

public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException {
    // 快照文件加载至内存
    snapLog.deserialize(dt, sessions);

    // 构造 FileTxnLog 实例
    FileTxnLog txnLog = new FileTxnLog(dataDir);
    // 经过logDir zxid 构造 FileTxnIterator 实例并执行初始化动做
    // 加载比快照数据新的日志文件并查找最接近快照的事务日志记录
    TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
    long highestZxid = dt.lastProcessedZxid;
    TxnHeader hdr;
    // 省略
}
复制代码

在执行spa

TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
复制代码

时会构建 FileTxnIterator 实例并执行 init 动做,代码以下:指针

public FileTxnIterator(File logDir, long zxid) throws IOException {
  this.logDir = logDir;
  this.zxid = zxid;
  init();
}
复制代码

接下来咱们看下其 init 中如何处理事务日志文件

void init() throws IOException {
    storedFiles = new ArrayList<File>();
    // 查找 logDir 目录下的全部事务日志记录文件,并按 zxid 降序排列
    List<File> files = Util.sortDataDir(FileTxnLog.getLogFiles(logDir.listFiles(), 0), "log", false);

    // storedFiles 用来记录 zxid 大于快照文件 zxid 的事务日志文件
    for (File f: files) {
        if (Util.getZxidFromName(f.getName(), "log") >= zxid) {
            storedFiles.add(f);
        }
        // add the last logfile that is less than the zxid
        else if (Util.getZxidFromName(f.getName(), "log") < zxid) {
            storedFiles.add(f);
            break;
        }
    }
    // 由于一个事务日志文件中记录了多条事务日志,而事务日志文件名的后缀是当前文件的第一条事务记录的zxid
    // 经过判断 hdr.getZxid < zxid 查找最接近快照的事务记录
    goToNextLog();
    if (!next())
        return;
    while (hdr.getZxid() < zxid) {
        if (!next())
            return;
    }
}
复制代码

init 的实现能够看出 zookeeper 对事物日志文件的处理流程以下:

  • 首先查找 logDir 目录下全部的事务日志文件 (log 为前缀) 并按 zxid 降序
  • 将事务日志文件的后缀 zxid 与快照文件 zxid 进行比较,查找并记录最接近快照的事务日志文件
  • 读取事务日志文件,经过 hdr.getZxid() < zxid 查找最接近快照的事务日志记录

处理快照以后的事务

上文中经过加载事务日志文件查找到快照以后所提交的事务记录,下面就看下如何处理这些事务记录的

public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException {
    // 快照文件加载至内存
    snapLog.deserialize(dt, sessions);
    FileTxnLog txnLog = new FileTxnLog(dataDir);
    // 加载比快照数据新的日志文件
    TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
    long highestZxid = dt.lastProcessedZxid;
    TxnHeader hdr;
    try {
        while (true) {
            // iterator points to 
            // the first valid txn when initialized
            hdr = itr.getHeader();
            if (hdr == null) {
                //empty logs 
                return dt.lastProcessedZxid;
            }
            if (hdr.getZxid() < highestZxid && highestZxid != 0) {
                LOG.error("{}(higestZxid) > {}(next log) for type {}",
                        new Object[] { highestZxid, hdr.getZxid(),
                                hdr.getType() });
            } else {
                highestZxid = hdr.getZxid();
            }
            try {
                // 处理事务日志 写入内存数据库
                processTransaction(hdr,dt,sessions, itr.getTxn());
            } catch(KeeperException.NoNodeException e) {
               throw new IOException("Failed to process transaction type: " +
                     hdr.getType() + " error: " + e.getMessage(), e);
            }
            // listener 回调将事务包装为 proposal 集群模式下同步至 follower
            listener.onTxnLoaded(hdr, itr.getTxn());
            // 继续下一条事务日志
            if (!itr.next()) 
                break;
        }
    } finally {
        if (itr != null) {
            itr.close();
        }
    }
    return highestZxid;
}
复制代码

restore 的后续实现咱们能够看出, zookeeper 在完成事务日志文件加载以后,会依次处理日志文件中每条事务记录:

  • 按事务类型处理事务记录,写入内存数据库
  • listener 回调事务加载事件;将事务包装为 proposal 存储到 committedLog 列表中,并分别记录 maxCommittedLogminCommittedLog (参见上文中的PlayBackListener

小结

经过上文的分析,能够归纳下 zookeeper 数据初始化的流程以下:

  • 加载快照文件,获取快照最新的 zxid
  • 加载事务日志文件,获取快照以后的事务记录
  • 处理快照以后的事务记录,写入内存
相关文章
相关标签/搜索