本文首发于 泊浮目的简书: https://www.jianshu.com/u/204...
版本 | 日期 | 备注 |
---|---|---|
1.0 | 2020.3.12 | 文章首发 |
在上篇文章中,咱们简单提到了Zookeeper的几个核心点。在这篇文章中,咱们就来探索其存储技术。在开始前,读者能够考虑思考下列问题:java
抱着问题,咱们进入下面的内容。node
众所周知,Zookeeper不擅长大量数据的读写、频繁的读操做。这与其存储模型息息相关——典型的LSM( Log-Structured Merge-Tree)。数据库
在这里简单的介绍LSM模型。该模型于1996年发明,2006年从Bigtable开始才开始收到关注。咱们能够直译为日志结构的合并树,这样能够猜出这个数据结构的大概:以日志为基础,不停的append,并组织成可合并的树形式。apache
从图中能够看出来,这是一个多层结构。C0在内存里,其他几层都在磁盘中。当C0到达必定大小时,就会向下作Compaction。segmentfault
C0通常咱们称为MTable(MemoryTable),用来保存有序KV对的内存缓冲区。api
而磁盘中的Ck咱们称为SSTable(Sorted String Table),用于保存有序KV对的只读文件。服务器
LSM存储MVCC的key-value。每次更新一个key-value都会生成一个新版本,删除一个key-value都会生成tomstone的新版本。session
前面提到了Log-Structured,其表如今数据在写入前,会作WAL(Write Ahead Logging)。这是普遍使用的保证多Block数据写入原子性的技术。通常在Block写入以前,会先把新数据写到一个日志中。只有写入END并调用Sync API,才开始对Block开始写入。若是在对block进行写入的任什么时候发生crash,均可以在重启时使用WAL里面的数据完成Block的写入。数据结构
另外,经过WAL,咱们在提交一个操做以前只须要进行文件的顺序写入,从而减小了包含多Block文件操做的数据写入延时。架构
典型基于LSM的存储引擎有:
至此,写操做就完成了。读者可能会问万一碰上Compaction呢?这是在后台作的,并不阻塞写入。
咱们以存储引擎放大指标(Amplification Factors)来对两个存储模型进行对比,在这里,简单介绍一下几个指标:
根据指标,咱们能够做出一个简单的对比:
LSM | B+Tree | |
---|---|---|
读放大 | 一个读操做要对多个Level上的SSTable进行读操做 | 一个Key-value的读操做涉及一个数据页的读操做,若干个索引页的读操做 |
写放大 | 一个key-value值的写操做要在多级的SSTable上进行 | 一个key-value的写操做涉及数据页的写操做,若干个索引页的写操做 |
空间放大 | 在SSTable中存储一个key-value的多个版本 | 索引页和gragmentation |
LSM和B+Tree在存储性能上的比较:
上面提到了LSM的优缺点,接下来,咱们来谈一谈常见的优化思路和Zk中采用的方案。
通常的WAL中每次写完END都要调用一次耗时的Sync API,这实际上是会影响到系统的性能。为了解决这个问题,咱们能够一次提交多个数据写入——只在最后一个数据写入的END日志以后,才调用Sync API。like this:
BEGIN Data1 END Sync
BEGIN Data2 END Sync
BEGIN Data3 END Sync
BEGIN Data1 END
BEGIN Data2 END
BEGIN Data3 END Sync`凡事都有代价,这可能会引发数据一致性相关的问题。
在往 WAL 里面追加日志的时候,若是当前的文件 block 不能保存新添加的日志,就要为文件分配新的 block,这要更新文件 inode 里面的信息(例如 size)。若是咱们使用的是 HHD 的话,就要先 seek 到 inode 所在的位置,而后回到新添加 block 的位置进行日志追加。为了减小这些 seek,咱们能够预先为 WAL 分配 block。例如 ZooKeeper 就是每次为 WAL 分配 64MB 的 block。
因此这也是Zookeeper不擅长读写大数据的缘由之一,这会引发大量的Block分配。
若是咱们使用一个内存数据结构加 WAL 的存储方案,WAL 就会一直增加。这样在存储系统启动的时候,就要读取大量的 WAL 日志数据来重建内存数据。快照能够解决这个问题。
除了解决启动时间过长的问题以外,快照还能够减小存储空间的使用。WAL 的多个日志条目有多是对同一个数据的改动,经过快照,就能够只保留最新的数据改动(Merge)。
Zk的确采用了这个方案来作优化。还带来的一个好处是:在一个节点加入时,就会传最新的Snapshot过去同步数据。
经过Bloom过滤器来加速查找——这是一种随机的数据结构,能够在O(1)的时间内判断一个给定的元素是否在集合中。注意:Bloom Filiter是可能产生Flase positive的(元素可能不在集合中)。
你能够将Bloom过滤器理解为一个哈希字典,可是真实的Bloom过滤器会比哈希字典复杂点,但这样想并不影响你去理解它。
这么看起来,在最坏的状况下,Bloom Filter彷佛没什么用。
若是咱们一直写入MTable,那么MTable则会一直增大,直到超出服务器内部限制。因此咱们须要把MTable的内存数据放到Durable Storage 上去,生成 SSTable 文件,这个操做叫作 minor Compaction。另外,还有两类常见的Compaction:
Compaction的好处:
本节内容都以3.5.7版本为例
TxnLog是咱们前面提到的事务日志。那么接下来咱们就来看它的相关源码。
先看注释:
package org.apache.zookeeper.server.persistence; import ... /** * This class implements the TxnLog interface. It provides api's * to access the txnlogs and add entries to it. * <p> * The format of a Transactional log is as follows: * <blockquote><pre> * LogFile: * FileHeader TxnList ZeroPad * * FileHeader: { * magic 4bytes (ZKLG) * version 4bytes * dbid 8bytes * } * * TxnList: * Txn || Txn TxnList * * Txn: * checksum Txnlen TxnHeader Record 0x42 * * checksum: 8bytes Adler32 is currently used * calculated across payload -- Txnlen, TxnHeader, Record and 0x42 * * Txnlen: * len 4bytes * * TxnHeader: { * sessionid 8bytes * cxid 4bytes * zxid 8bytes * time 8bytes * type 4bytes * } * * Record: * See Jute definition file for details on the various record types * * ZeroPad: * 0 padded to EOF (filled during preallocation stage) * </pre></blockquote> */ public class FileTxnLog implements TxnLog, Closeable {
在注释中,咱们能够看到一个FileLog由三部分组成:
关于FileHeader,能够理解其为一个标示符。TxnList则为主要内容。ZeroPad是一个终结符。
咱们来看看最典型的append方法,能够将其理解WAL过程当中的核心方法额:
/** * append an entry to the transaction log * @param hdr the header of the transaction * @param txn the transaction part of the entry * returns true iff something appended, otw false */ public synchronized boolean append(TxnHeader hdr, Record txn) throws IOException { if (hdr == null) { //为null意味着这是一个读请求,直接返回 return false; } if (hdr.getZxid() <= lastZxidSeen) { LOG.warn("Current zxid " + hdr.getZxid() + " is <= " + lastZxidSeen + " for " + hdr.getType()); } else { lastZxidSeen = hdr.getZxid(); } if (logStream==null) { //为空的话则new一个Stream if(LOG.isInfoEnabled()){ LOG.info("Creating new log file: " + Util.makeLogName(hdr.getZxid())); } logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid())); fos = new FileOutputStream(logFileWrite); logStream=new BufferedOutputStream(fos); oa = BinaryOutputArchive.getArchive(logStream); FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId); fhdr.serialize(oa, "fileheader"); //写file header // Make sure that the magic number is written before padding. logStream.flush(); // zxid必须比日志先落盘 filePadding.setCurrentSize(fos.getChannel().position()); streamsToFlush.add(fos); //加入须要Flush的队列 } filePadding.padFile(fos.getChannel()); //扩容。每次64m扩容 byte[] buf = Util.marshallTxnEntry(hdr, txn); //序列化写入 if (buf == null || buf.length == 0) { throw new IOException("Faulty serialization for header " + "and txn"); } //生成butyArray的checkSum Checksum crc = makeChecksumAlgorithm(); crc.update(buf, 0, buf.length); oa.writeLong(crc.getValue(), "txnEntryCRC");//写入日志里 Util.writeTxnBytes(oa, buf); return true; }
这里有个zxid(ZooKeeper Transaction Id),有点像MySQL的GTID。每次对Zookeeper的状态的改变都会产生一个zxid,zxid是全局有序的,若是zxid1小于zxid2,则zxid1在zxid2以前发生。
这个方法被调用的时机大体有:
/** * commit the logs. make sure that everything hits the * disk */ public synchronized void commit() throws IOException { if (logStream != null) { logStream.flush(); } for (FileOutputStream log : streamsToFlush) { log.flush(); if (forceSync) { long startSyncNS = System.nanoTime(); FileChannel channel = log.getChannel(); channel.force(false);//对应fdataSync syncElapsedMS = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS); if (syncElapsedMS > fsyncWarningThresholdMS) { if(serverStats != null) { serverStats.incrementFsyncThresholdExceedCount(); } LOG.warn("fsync-ing the write ahead log in " + Thread.currentThread().getName() + " took " + syncElapsedMS + "ms which will adversely effect operation latency. " + "File size is " + channel.size() + " bytes. " + "See the ZooKeeper troubleshooting guide"); } } } while (streamsToFlush.size() > 1) { streamsToFlush.removeFirst().close(); } }
代码很是的简单。若是logStream还有,那就先刷下去。而后遍历待flush的队列(是个链表,用来保持操做顺序),同时还会关注写入的时间,若是过长,则会打一个Warn的日志。
DataTree是Zk的内存数据结构——就是咱们以前说到的MTable。它以树状结构来组织DataNode。
这么听起来可能有点云里雾里,不妨直接看一下DataNode的相关代码。
public class DataNode implements Record { /** the data for this datanode */ byte data[]; /** * the acl map long for this datanode. the datatree has the map */ Long acl; /** * the stat for this node that is persisted to disk. */ public StatPersisted stat; /** * the list of children for this node. note that the list of children string * does not contain the parent path -- just the last part of the path. This * should be synchronized on except deserializing (for speed up issues). */ private Set<String> children = null; ..... }
若是用过ZkClient的小伙伴,可能很是熟悉。这就是咱们根据一个path获取数据时返回的相关属性——这就是用来描述存储数据的一个类。注意,DataNode还会维护它的Children。
简单了解DataNode后,咱们来看一下DataTree。为了不干扰,咱们选出最关键的成员变量:
public class DataTree { private static final Logger LOG = LoggerFactory.getLogger(DataTree.class); /** * This hashtable provides a fast lookup to the datanodes. The tree is the * source of truth and is where all the locking occurs */ private final ConcurrentHashMap<String, DataNode> nodes = new ConcurrentHashMap<String, DataNode>(); private final WatchManager dataWatches = new WatchManager(); private final WatchManager childWatches = new WatchManager(); /** * This hashtable lists the paths of the ephemeral nodes of a session. */ private final Map<Long, HashSet<String>> ephemerals = new ConcurrentHashMap<Long, HashSet<String>>(); ....... }
咱们能够看到,DataTree本质上是经过一个ConcurrentHashMap来存储DataNode的(临时节点也是)。保存的是 DataNode 的 path 到 DataNode 的映射。
那为何要保存两个状态呢?这得看调用它们被调用的场景:
若是须要获取全部节点的信息,显然遍历树会比一个个从ConcurrentHashMap 拿快。
接下来看一下序列化的相关代码:
/** * this method uses a stringbuilder to create a new path for children. This * is faster than string appends ( str1 + str2). * * @param oa * OutputArchive to write to. * @param path * a string builder. * @throws IOException * @throws InterruptedException */ void serializeNode(OutputArchive oa, StringBuilder path) throws IOException { String pathString = path.toString(); DataNode node = getNode(pathString); if (node == null) { return; } String children[] = null; DataNode nodeCopy; synchronized (node) { StatPersisted statCopy = new StatPersisted(); copyStatPersisted(node.stat, statCopy); //we do not need to make a copy of node.data because the contents //are never changed nodeCopy = new DataNode(node.data, node.acl, statCopy); Set<String> childs = node.getChildren(); children = childs.toArray(new String[childs.size()]); } serializeNodeData(oa, pathString, nodeCopy); path.append('/'); int off = path.length(); for (String child : children) { // since this is single buffer being resused // we need // to truncate the previous bytes of string. path.delete(off, Integer.MAX_VALUE); path.append(child); serializeNode(oa, path); } }
能够看到,的确是经过DataNode的Children来遍历全部节点。
接下来看一下反序列化的代码:
public void deserialize(InputArchive ia, String tag) throws IOException { aclCache.deserialize(ia); nodes.clear(); pTrie.clear(); String path = ia.readString("path"); while (!"/".equals(path)) { DataNode node = new DataNode(); ia.readRecord(node, "node"); nodes.put(path, node); synchronized (node) { aclCache.addUsage(node.acl); } int lastSlash = path.lastIndexOf('/'); if (lastSlash == -1) { root = node; } else { String parentPath = path.substring(0, lastSlash); DataNode parent = nodes.get(parentPath); if (parent == null) { throw new IOException("Invalid Datatree, unable to find " + "parent " + parentPath + " of path " + path); } parent.addChild(path.substring(lastSlash + 1)); long eowner = node.stat.getEphemeralOwner(); EphemeralType ephemeralType = EphemeralType.get(eowner); if (ephemeralType == EphemeralType.CONTAINER) { containers.add(path); } else if (ephemeralType == EphemeralType.TTL) { ttls.add(path); } else if (eowner != 0) { HashSet<String> list = ephemerals.get(eowner); if (list == null) { list = new HashSet<String>(); ephemerals.put(eowner, list); } list.add(path); } } path = ia.readString("path"); } nodes.put("/", root); // we are done with deserializing the // the datatree // update the quotas - create path trie // and also update the stat nodes setupQuota(); aclCache.purgeUnused(); }
由于序列化的时候是前序遍历。因此反序列化时是先反序列化父亲节点,再反序列化孩子节点。
那么DataTree在什么状况下会序列化呢?在这里就要提到快照了。
前面提到过:若是咱们使用一个内存数据结构加 WAL 的存储方案,WAL 就会一直增加。这样在存储系统启动的时候,就要读取大量的 WAL 日志数据来重建内存数据。快照能够解决这个问题。
除了减小WAL日志,Snapshot还会在Zk全量同步时被用到——当一个全新的ZkServer(这个通常叫Learner)被加入集群时,Leader服务器会将本机上的数据全量同步给新来的ZkServer。
接下来看一下代码入口:
/** * serialize the datatree and session into the file snapshot * @param dt the datatree to be serialized * @param sessions the sessions to be serialized * @param snapShot the file to store snapshot into */ public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot) throws IOException { if (!close) { try (OutputStream sessOS = new BufferedOutputStream(new FileOutputStream(snapShot)); CheckedOutputStream crcOut = new CheckedOutputStream(sessOS, new Adler32())) { //CheckedOutputStream cout = new CheckedOutputStream() OutputArchive oa = BinaryOutputArchive.getArchive(crcOut); FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId); serialize(dt, sessions, oa, header); long val = crcOut.getChecksum().getValue(); oa.writeLong(val, "val"); oa.writeString("/", "path"); sessOS.flush(); } } else { throw new IOException("FileSnap has already been closed"); } }
JavaIO的基础知识在这再也不介绍,有兴趣的人能够自行查阅资料或看 从一段代码谈起——浅谈JavaIO接口。
本质就是建立文件,并调用DataTree的序列化方法,DataTree的序列化其实就是遍历DataNode去序列化,最后将这些序列化的内容写入文件。
/** * deserialize a data tree from the most recent snapshot * @return the zxid of the snapshot */ public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException { // we run through 100 snapshots (not all of them) // if we cannot get it running within 100 snapshots // we should give up List<File> snapList = findNValidSnapshots(100); if (snapList.size() == 0) { return -1L; } File snap = null; boolean foundValid = false; for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) { snap = snapList.get(i); LOG.info("Reading snapshot " + snap); try (InputStream snapIS = new BufferedInputStream(new FileInputStream(snap)); CheckedInputStream crcIn = new CheckedInputStream(snapIS, new Adler32())) { InputArchive ia = BinaryInputArchive.getArchive(crcIn); deserialize(dt, sessions, ia); long checkSum = crcIn.getChecksum().getValue(); long val = ia.readLong("val"); if (val != checkSum) { throw new IOException("CRC corruption in snapshot : " + snap); } foundValid = true; break; } catch (IOException e) { LOG.warn("problem reading snap file " + snap, e); } } if (!foundValid) { throw new IOException("Not able to find valid snapshots in " + snapDir); } dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX); return dt.lastProcessedZxid; }
简单来讲,先读取Snapshot文件们。并反序列化它们,组成DataTree。
在本文中,笔者和你们一块儿学习了Zk的底层存储技术。另外提一下,Zk中序列化技术用的是Apache Jute——本质上调用了JavaDataOutput和Input,较为简单。故没在本文中展开。