Zookeeper中写操做都有维护一个事务日志,专门用于写znode操做。只有事务日志确认过的数据变动才会在整个集群生效。node
一、TxnLog数据库
TxnLog是写事务日志接口,主要包含如下接口:缓存
rollLog方法提供日志滚动功能,若是任随事务日志文件无限增加,势必影响到性能,rollLog方法会从新启动新的事务日志文件名,后续事务写到新的文件中,ZooKeeper中多个事务日志文件一般会以zxid来区别。app
Truncate方法提供截断日志功能,将zxid以后的事务所有删除。一般当Follower的事务日志比Leader还多的时候会触发改方法,保证Follower和Leader的数据库同步。dom
Commit方法提交文件缓存到磁盘,确保事务真实写到磁盘而不是仅仅存在于文件内存缓存中。性能
二、实现类FileTxnLogthis
ZooKeeper中实现TxnLog接口的类是FileTxnLog,它的主要功能和方法包括如下这些。日志
2.1 append方法code
主要代码:接口
logFileWrite = new File(logDir, ("log." + Long.toHexString(hdr.getZxid()))); fos = new FileOutputStream(logFileWrite); logStream=new BufferedOutputStream(fos); oa = BinaryOutputArchive.getArchive(logStream); FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId); fhdr.serialize(oa, "fileheader"); logStream.flush(); currentSize = fos.getChannel().position(); streamsToFlush.add(fos); padFile(fos); Checksum crc = makeChecksumAlgorithm(); crc.update(buf, 0, buf.length); oa.writeLong(crc.getValue(), "txnEntryCRC"); Util.writeTxnBytes(oa, buf);
该方法将事务添加到日志文件的尾部。
日志没滚动前,写到当前日志文件;日志回滚的话写到新的日志文件,新日志文件的名称是”log.”加当前zxid的值。
2.2 rollLog方法
日志滚动,关闭旧的日志文件,启动新的日志文件,主要代码:
if (logStream != null) { this.logStream.flush(); this.logStream = null; }
2.3 getLastLoggedZxid方法
从日志文件中获取最近记录的zxid的值,从lastLoggedZxid就能得到最新的事务日志文件。多个日志文件的状况会遍历全部文件,选出文件名中zxid最大的那个日志文件,再从该日志文件中取到最大的zxid。
主要代码:
public long getLastLoggedZxid() { File[] files = getLogFiles(logDir.listFiles(), 0); long zxid = maxLog; TxnIterator itr = null; FileTxnLog txn = new FileTxnLog(logDir); itr = txn.read(maxLog); while (true) { if(!itr.next()) break; TxnHeader hdr = itr.getHeader(); zxid = hdr.getZxid(); } return zxid; }
2.4 truncate方法
截断多余的日志信息,保证日志文件是合法有效的。
主要代码:
public boolean truncate(long zxid) throws IOException { FileTxnIterator itr = null; itr = new FileTxnIterator(this.logDir, zxid); PositionInputStream input = itr.inputStream; long pos = input.getPosition(); RandomAccessFile raf=new RandomAccessFile(itr.logFile,"rw"); raf.setLength(pos); raf.close(); return true; }
2.5 commit方法
确认提交日志文件缓存,主要代码:
public synchronized void commit() throws IOException { for (FileOutputStream log : streamsToFlush) { log.flush(); if (forceSync) { long startSyncNS = System.nanoTime(); log.getChannel().force(false); } } }