Zookeeper学习笔记(二):存储技术

本文首发于 泊浮目的简书: https://www.jianshu.com/u/204...
版本 日期 备注
1.0 2020.3.12 文章首发

前言

在上篇文章中,咱们简单提到了Zookeeper的几个核心点。在这篇文章中,咱们就来探索其存储技术。在开始前,读者能够考虑思考下列问题:java

  • Zookeeper的数据存储是如何实现的?
  • Zookeeper进行一次写操做的时候,会发生什么?
  • Zookeeper进行一次读操做的时候,会发生什么?
  • 相比数据库(以MySQL为例)来讲,其读放大、写放大、空间放大的优劣
  • 当一个Zookeeper新加入现有集群时,如何同步现集群中的数据?

抱着问题,咱们进入下面的内容。node

Zookeper本地存储架构

众所周知,Zookeeper不擅长大量数据的读写、频繁的读操做。这与其存储模型息息相关——典型的LSM( Log-Structured Merge-Tree)。数据库

在这里简单的介绍LSM模型。该模型于1996年发明,2006年从Bigtable开始才开始收到关注。咱们能够直译为日志结构的合并树,这样能够猜出这个数据结构的大概:以日志为基础,不停的append,并组织成可合并的树形式。apache

从图中能够看出来,这是一个多层结构。C0在内存里,其他几层都在磁盘中。当C0到达必定大小时,就会向下作Compaction。segmentfault

C0通常咱们称为MTable(MemoryTable),用来保存有序KV对的内存缓冲区。api

而磁盘中的Ck咱们称为SSTable(Sorted String Table),用于保存有序KV对的只读文件。服务器

LSM存储MVCC的key-value。每次更新一个key-value都会生成一个新版本,删除一个key-value都会生成tomstone的新版本。session

前面提到了Log-Structured,其表如今数据在写入前,会作WAL(Write Ahead Logging)。这是普遍使用的保证多Block数据写入原子性的技术。通常在Block写入以前,会先把新数据写到一个日志中。只有写入END并调用Sync API,才开始对Block开始写入。若是在对block进行写入的任什么时候发生crash,均可以在重启时使用WAL里面的数据完成Block的写入。数据结构

另外,经过WAL,咱们在提交一个操做以前只须要进行文件的顺序写入,从而减小了包含多Block文件操做的数据写入延时。架构

典型基于LSM的存储引擎有:

  • LevelDB:基于C++开发,Chrome的IndexedDB使用的是LevelDB
  • RocksDB:基于C++开发,功能丰富,应用普遍,例如CockroachDB、TiKV和Kafka Streams等。
  • Pebble:Go语言开发,应用于CockeroachDB。
  • BadgerDB:一种分离存储key和value的LSM存储引擎。
  • WiredTiger:WiredTiger除了支持B-Tree觉得,还支持LSM。

LSM的读写

当一次写操做发生时

  1. 将操做写入事务日志(WAL)
  2. 写入MTable

至此,写操做就完成了。读者可能会问万一碰上Compaction呢?这是在后台作的,并不阻塞写入。

当一次读操做发生时

  1. 寻找MTable里的数据
  2. 若是MTable中没有数据,则往SSTable中一个个找。

横向对比B-Tree

咱们以存储引擎放大指标(Amplification Factors)来对两个存储模型进行对比,在这里,简单介绍一下几个指标:

  1. 读放大(read amplification):一个查询涉及的外部存储读操做次数。若是咱们查询一个数据须要作3次外部存储读取,那么读放大就是3.
  2. 写放大(write amplification):写入外部存储的数据量和写入应用的数据量的比率。若是咱们对应用写入了1MB,可是对外部存储写入了2MB,那么写放大就是2.
  3. 空间放大(space amplification):数据库占用的外部存储量和应用自己数据量的比率。若是一个1MB的应用数据占用了10MB,那么空间放大就是10倍。

根据指标,咱们能够做出一个简单的对比:

LSM B+Tree
读放大 一个读操做要对多个Level上的SSTable进行读操做 一个Key-value的读操做涉及一个数据页的读操做,若干个索引页的读操做
写放大 一个key-value值的写操做要在多级的SSTable上进行 一个key-value的写操做涉及数据页的写操做,若干个索引页的写操做
空间放大 在SSTable中存储一个key-value的多个版本 索引页和gragmentation

LSM和B+Tree在存储性能上的比较:

  • 写操做:LSM的一个写操做涉及对日志的追加操做和MTable的更新。而B+Tree一个写操做对若个干索引页和一个数据页进行读写操做,可能会致使屡次的随机IO。因此LSM的写操做通常比B+Tree的写操做性能好。
  • 读操做:LSM的一个读操做须要对全部的SSTable的内容和MTable的内容进行合并。而在B+Tree上,一个读操做对若干个索引页和一个数据页进行读操做。因此B+Tree的读操做性能通常比LSM的读操做性能好。

LSM的优化

上面提到了LSM的优缺点,接下来,咱们来谈一谈常见的优化思路和Zk中采用的方案。

WAL优化方案1:Group Commit

通常的WAL中每次写完END都要调用一次耗时的Sync API,这实际上是会影响到系统的性能。为了解决这个问题,咱们能够一次提交多个数据写入——只在最后一个数据写入的END日志以后,才调用Sync API。like this:

  • without group commit: BEGIN Data1 END Sync BEGIN Data2 END Sync BEGIN Data3 END Sync
  • with group commit: BEGIN Data1 END BEGIN Data2 END BEGIN Data3 END Sync`

凡事都有代价,这可能会引发数据一致性相关的问题。

WAL优化方案2:File Padding

在往 WAL 里面追加日志的时候,若是当前的文件 block 不能保存新添加的日志,就要为文件分配新的 block,这要更新文件 inode 里面的信息(例如 size)。若是咱们使用的是 HHD 的话,就要先 seek 到 inode 所在的位置,而后回到新添加 block 的位置进行日志追加。为了减小这些 seek,咱们能够预先为 WAL 分配 block。例如 ZooKeeper 就是每次为 WAL 分配 64MB 的 block。

因此这也是Zookeeper不擅长读写大数据的缘由之一,这会引发大量的Block分配。

WAL优化方案3:Snapshot

若是咱们使用一个内存数据结构加 WAL 的存储方案,WAL 就会一直增加。这样在存储系统启动的时候,就要读取大量的 WAL 日志数据来重建内存数据。快照能够解决这个问题。

除了解决启动时间过长的问题以外,快照还能够减小存储空间的使用。WAL 的多个日志条目有多是对同一个数据的改动,经过快照,就能够只保留最新的数据改动(Merge)。

Zk的确采用了这个方案来作优化。还带来的一个好处是:在一个节点加入时,就会传最新的Snapshot过去同步数据。

LSM的读优化:Bloom Filter

经过Bloom过滤器来加速查找——这是一种随机的数据结构,能够在O(1)的时间内判断一个给定的元素是否在集合中。注意:Bloom Filiter是可能产生Flase positive的(元素可能不在集合中)。

你能够将Bloom过滤器理解为一个哈希字典,可是真实的Bloom过滤器会比哈希字典复杂点,但这样想并不影响你去理解它。

这么看起来,在最坏的状况下,Bloom Filter彷佛没什么用。

LSM的写优化:Compation

若是咱们一直写入MTable,那么MTable则会一直增大,直到超出服务器内部限制。因此咱们须要把MTable的内存数据放到Durable Storage 上去,生成 SSTable 文件,这个操做叫作 minor Compaction。另外,还有两类常见的Compaction:

  • Merge Compaction:把连续Level的SSTable和MTable合并成一个SSTable。目的是减小读取操做要读取的SSTable数量。
  • Major Compaction:合并全部Level上的SSTable的Merge Compaction。目的在于完全删除Tomstone数据,并释放全部的存储空间。

Compaction的好处:

  1. 减小内存消耗
  2. 减小读取事务日志来恢复数据消耗的时间

源码解析

本节内容都以3.5.7版本为例

核心接口和类

  • TxnLog:接口类型,提供读写事务日志的API。
  • FileTxnLog:基于文件的TxnLog实现。
  • Snapshot:快照接口类型,提供序列化、反序列化、访问快照API。
  • FileSnapshot:基于文件的Snapshot实现。
  • FileTxnSnapLog:TxnLog和Snapshot的封装
  • DataTree:Zookeeper的内存数据结构,ZNode构成的树。
  • DataNode:表示一个ZNode。

TxnLog

TxnLog是咱们前面提到的事务日志。那么接下来咱们就来看它的相关源码。

先看注释:

package org.apache.zookeeper.server.persistence;

import ...

/**
 * This class implements the TxnLog interface. It provides api's
 * to access the txnlogs and add entries to it.
 * <p>
 * The format of a Transactional log is as follows:
 * <blockquote><pre>
 * LogFile:
 *     FileHeader TxnList ZeroPad
 *
 * FileHeader: {
 *     magic 4bytes (ZKLG)
 *     version 4bytes
 *     dbid 8bytes
 *   }
 *
 * TxnList:
 *     Txn || Txn TxnList
 *
 * Txn:
 *     checksum Txnlen TxnHeader Record 0x42
 *
 * checksum: 8bytes Adler32 is currently used
 *   calculated across payload -- Txnlen, TxnHeader, Record and 0x42
 *
 * Txnlen:
 *     len 4bytes
 *
 * TxnHeader: {
 *     sessionid 8bytes
 *     cxid 4bytes
 *     zxid 8bytes
 *     time 8bytes
 *     type 4bytes
 *   }
 *
 * Record:
 *     See Jute definition file for details on the various record types
 *
 * ZeroPad:
 *     0 padded to EOF (filled during preallocation stage)
 * </pre></blockquote>
 */
public class FileTxnLog implements TxnLog, Closeable {

在注释中,咱们能够看到一个FileLog由三部分组成:

  • FileHeader
  • TxnList
  • ZerdPad

关于FileHeader,能够理解其为一个标示符。TxnList则为主要内容。ZeroPad是一个终结符。

append

咱们来看看最典型的append方法,能够将其理解WAL过程当中的核心方法额:

/**
     * append an entry to the transaction log
     * @param hdr the header of the transaction
     * @param txn the transaction part of the entry
     * returns true iff something appended, otw false
     */
    public synchronized boolean append(TxnHeader hdr, Record txn)
        throws IOException
    {
        if (hdr == null) { //为null意味着这是一个读请求,直接返回
            return false;
        }
        if (hdr.getZxid() <= lastZxidSeen) {
            LOG.warn("Current zxid " + hdr.getZxid()
                    + " is <= " + lastZxidSeen + " for "
                    + hdr.getType());
        } else {
            lastZxidSeen = hdr.getZxid();
        }
        if (logStream==null) { //为空的话则new一个Stream
           if(LOG.isInfoEnabled()){
                LOG.info("Creating new log file: " + Util.makeLogName(hdr.getZxid()));
           }

           logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
           fos = new FileOutputStream(logFileWrite);
           logStream=new BufferedOutputStream(fos);
           oa = BinaryOutputArchive.getArchive(logStream);
           FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
           fhdr.serialize(oa, "fileheader");   //写file header
           // Make sure that the magic number is written before padding.
           logStream.flush();      // zxid必须比日志先落盘
           filePadding.setCurrentSize(fos.getChannel().position());
           streamsToFlush.add(fos); //加入须要Flush的队列
        }
        filePadding.padFile(fos.getChannel());   //扩容。每次64m扩容
        byte[] buf = Util.marshallTxnEntry(hdr, txn);  //序列化写入
        if (buf == null || buf.length == 0) {
            throw new IOException("Faulty serialization for header " +
                    "and txn");
        }
         //生成butyArray的checkSum
        Checksum crc = makeChecksumAlgorithm();
        crc.update(buf, 0, buf.length);
        oa.writeLong(crc.getValue(), "txnEntryCRC");//写入日志里
        Util.writeTxnBytes(oa, buf);
        return true;
    }

这里有个zxid(ZooKeeper Transaction Id),有点像MySQL的GTID。每次对Zookeeper的状态的改变都会产生一个zxid,zxid是全局有序的,若是zxid1小于zxid2,则zxid1在zxid2以前发生。

commit

这个方法被调用的时机大体有:

  • 服务端比较闲的时候去调用
  • 到请求数量超出1000时,调用。以前提到过GroupCommit,其实就是在这个时候调用的。
  • zk的shutdown钩子被调用时,调用
/**
     * commit the logs. make sure that everything hits the
     * disk
     */
    public synchronized void commit() throws IOException {
        if (logStream != null) {
            logStream.flush();
        }
        for (FileOutputStream log : streamsToFlush) {
            log.flush();
            if (forceSync) {
                long startSyncNS = System.nanoTime();

                FileChannel channel = log.getChannel();
                channel.force(false);//对应fdataSync

                syncElapsedMS = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
                if (syncElapsedMS > fsyncWarningThresholdMS) {
                    if(serverStats != null) {
                        serverStats.incrementFsyncThresholdExceedCount();
                    }
                    LOG.warn("fsync-ing the write ahead log in "
                            + Thread.currentThread().getName()
                            + " took " + syncElapsedMS
                            + "ms which will adversely effect operation latency. "
                            + "File size is " + channel.size() + " bytes. "
                            + "See the ZooKeeper troubleshooting guide");
                }
            }
        }
        while (streamsToFlush.size() > 1) {
            streamsToFlush.removeFirst().close();
        }
    }

代码很是的简单。若是logStream还有,那就先刷下去。而后遍历待flush的队列(是个链表,用来保持操做顺序),同时还会关注写入的时间,若是过长,则会打一个Warn的日志。

DataTree和DataNode

DataTree是Zk的内存数据结构——就是咱们以前说到的MTable。它以树状结构来组织DataNode。

这么听起来可能有点云里雾里,不妨直接看一下DataNode的相关代码。

public class DataNode implements Record {
    /** the data for this datanode */
    byte data[];

    /**
     * the acl map long for this datanode. the datatree has the map
     */
    Long acl;

    /**
     * the stat for this node that is persisted to disk.
     */
    public StatPersisted stat;

    /**
     * the list of children for this node. note that the list of children string
     * does not contain the parent path -- just the last part of the path. This
     * should be synchronized on except deserializing (for speed up issues).
     */
    private Set<String> children = null;
.....
}

若是用过ZkClient的小伙伴,可能很是熟悉。这就是咱们根据一个path获取数据时返回的相关属性——这就是用来描述存储数据的一个类。注意,DataNode还会维护它的Children。

简单了解DataNode后,咱们来看一下DataTree。为了不干扰,咱们选出最关键的成员变量:

public class DataTree {
    private static final Logger LOG = LoggerFactory.getLogger(DataTree.class);

    /**
     * This hashtable provides a fast lookup to the datanodes. The tree is the
     * source of truth and is where all the locking occurs
     */
    private final ConcurrentHashMap<String, DataNode> nodes =
        new ConcurrentHashMap<String, DataNode>();

    private final WatchManager dataWatches = new WatchManager();

    private final WatchManager childWatches = new WatchManager();

    /**
     * This hashtable lists the paths of the ephemeral nodes of a session.
     */
    private final Map<Long, HashSet<String>> ephemerals =
        new ConcurrentHashMap<Long, HashSet<String>>();
    .......
}

咱们能够看到,DataTree本质上是经过一个ConcurrentHashMap来存储DataNode的(临时节点也是)。保存的是 DataNode 的 path 到 DataNode 的映射。

那为何要保存两个状态呢?这得看调用它们被调用的场景:

  • 通常CRUD ZNode的请求都是走ConcurrentHashMap的
  • 序列化DataTree的时候会从Root节点开始遍历全部节点

若是须要获取全部节点的信息,显然遍历树会比一个个从ConcurrentHashMap 拿快。

接下来看一下序列化的相关代码:

DataNode的序列化方法

/**
     * this method uses a stringbuilder to create a new path for children. This
     * is faster than string appends ( str1 + str2).
     *
     * @param oa
     *            OutputArchive to write to.
     * @param path
     *            a string builder.
     * @throws IOException
     * @throws InterruptedException
     */
    void serializeNode(OutputArchive oa, StringBuilder path) throws IOException {
        String pathString = path.toString();
        DataNode node = getNode(pathString);
        if (node == null) {
            return;
        }
        String children[] = null;
        DataNode nodeCopy;
        synchronized (node) {
            StatPersisted statCopy = new StatPersisted();
            copyStatPersisted(node.stat, statCopy);
            //we do not need to make a copy of node.data because the contents
            //are never changed
            nodeCopy = new DataNode(node.data, node.acl, statCopy);
            Set<String> childs = node.getChildren();
            children = childs.toArray(new String[childs.size()]);
        }
        serializeNodeData(oa, pathString, nodeCopy);
        path.append('/');
        int off = path.length();
        for (String child : children) {
            // since this is single buffer being resused
            // we need
            // to truncate the previous bytes of string.
            path.delete(off, Integer.MAX_VALUE);
            path.append(child);
            serializeNode(oa, path);
        }
    }

能够看到,的确是经过DataNode的Children来遍历全部节点。

DataNode的反序列化方法

接下来看一下反序列化的代码:

public void deserialize(InputArchive ia, String tag) throws IOException {
        aclCache.deserialize(ia);
        nodes.clear();
        pTrie.clear();
        String path = ia.readString("path");
        while (!"/".equals(path)) {
            DataNode node = new DataNode();
            ia.readRecord(node, "node");
            nodes.put(path, node);
            synchronized (node) {
                aclCache.addUsage(node.acl);
            }
            int lastSlash = path.lastIndexOf('/');
            if (lastSlash == -1) {
                root = node;
            } else {
                String parentPath = path.substring(0, lastSlash);
                DataNode parent = nodes.get(parentPath);
                if (parent == null) {
                    throw new IOException("Invalid Datatree, unable to find " +
                            "parent " + parentPath + " of path " + path);
                }
                parent.addChild(path.substring(lastSlash + 1));
                long eowner = node.stat.getEphemeralOwner();
                EphemeralType ephemeralType = EphemeralType.get(eowner);
                if (ephemeralType == EphemeralType.CONTAINER) {
                    containers.add(path);
                } else if (ephemeralType == EphemeralType.TTL) {
                    ttls.add(path);
                } else if (eowner != 0) {
                    HashSet<String> list = ephemerals.get(eowner);
                    if (list == null) {
                        list = new HashSet<String>();
                        ephemerals.put(eowner, list);
                    }
                    list.add(path);
                }
            }
            path = ia.readString("path");
        }
        nodes.put("/", root);
        // we are done with deserializing the
        // the datatree
        // update the quotas - create path trie
        // and also update the stat nodes
        setupQuota();

        aclCache.purgeUnused();
    }

由于序列化的时候是前序遍历。因此反序列化时是先反序列化父亲节点,再反序列化孩子节点。

Snapshot

那么DataTree在什么状况下会序列化呢?在这里就要提到快照了。

前面提到过:若是咱们使用一个内存数据结构加 WAL 的存储方案,WAL 就会一直增加。这样在存储系统启动的时候,就要读取大量的 WAL 日志数据来重建内存数据。快照能够解决这个问题。

除了减小WAL日志,Snapshot还会在Zk全量同步时被用到——当一个全新的ZkServer(这个通常叫Learner)被加入集群时,Leader服务器会将本机上的数据全量同步给新来的ZkServer。

序列化

接下来看一下代码入口:

/**
     * serialize the datatree and session into the file snapshot
     * @param dt the datatree to be serialized
     * @param sessions the sessions to be serialized
     * @param snapShot the file to store snapshot into
     */
    public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot)
            throws IOException {
        if (!close) {
            try (OutputStream sessOS = new BufferedOutputStream(new FileOutputStream(snapShot));
                 CheckedOutputStream crcOut = new CheckedOutputStream(sessOS, new Adler32())) {
                //CheckedOutputStream cout = new CheckedOutputStream()
                OutputArchive oa = BinaryOutputArchive.getArchive(crcOut);
                FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId);
                serialize(dt, sessions, oa, header);
                long val = crcOut.getChecksum().getValue();
                oa.writeLong(val, "val");
                oa.writeString("/", "path");
                sessOS.flush();
            }
        } else {
            throw new IOException("FileSnap has already been closed");
        }
    }

JavaIO的基础知识在这再也不介绍,有兴趣的人能够自行查阅资料或看 从一段代码谈起——浅谈JavaIO接口

本质就是建立文件,并调用DataTree的序列化方法,DataTree的序列化其实就是遍历DataNode去序列化,最后将这些序列化的内容写入文件。

反序列化

/**
     * deserialize a data tree from the most recent snapshot
     * @return the zxid of the snapshot
     */
    public long deserialize(DataTree dt, Map<Long, Integer> sessions)
            throws IOException {
        // we run through 100 snapshots (not all of them)
        // if we cannot get it running within 100 snapshots
        // we should  give up
        List<File> snapList = findNValidSnapshots(100);
        if (snapList.size() == 0) {
            return -1L;
        }
        File snap = null;
        boolean foundValid = false;
        for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) {
            snap = snapList.get(i);
            LOG.info("Reading snapshot " + snap);
            try (InputStream snapIS = new BufferedInputStream(new FileInputStream(snap));
                 CheckedInputStream crcIn = new CheckedInputStream(snapIS, new Adler32())) {
                InputArchive ia = BinaryInputArchive.getArchive(crcIn);
                deserialize(dt, sessions, ia);
                long checkSum = crcIn.getChecksum().getValue();
                long val = ia.readLong("val");
                if (val != checkSum) {
                    throw new IOException("CRC corruption in snapshot :  " + snap);
                }
                foundValid = true;
                break;
            } catch (IOException e) {
                LOG.warn("problem reading snap file " + snap, e);
            }
        }
        if (!foundValid) {
            throw new IOException("Not able to find valid snapshots in " + snapDir);
        }
        dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);
        return dt.lastProcessedZxid;
    }

简单来讲,先读取Snapshot文件们。并反序列化它们,组成DataTree。

小结

在本文中,笔者和你们一块儿学习了Zk的底层存储技术。另外提一下,Zk中序列化技术用的是Apache Jute——本质上调用了JavaDataOutput和Input,较为简单。故没在本文中展开。

相关文章
相关标签/搜索