ZooKeeper系列之(十五):写日志

Zookeeper中写操做都有维护一个事务日志,专门用于写znode操做。只有事务日志确认过的数据变动才会在整个集群生效。node

一、TxnLog数据库

TxnLog是写事务日志接口,主要包含如下接口:缓存

  1. rollLog: 日志滚动,启动新日志
  2. append:添加写事务到日志尾部
  3. getLastLoggedZxid:读取最后记录的zxid
  4. truncate:截断写事务,用在Learner事务比Leader事务多的场景
  5. commit:提交事务,确认事务已持久化

rollLog方法提供日志滚动功能,若是任随事务日志文件无限增加,势必影响到性能,rollLog方法会从新启动新的事务日志文件名,后续事务写到新的文件中,ZooKeeper中多个事务日志文件一般会以zxid来区别。app

Truncate方法提供截断日志功能,将zxid以后的事务所有删除。一般当Follower的事务日志比Leader还多的时候会触发改方法,保证Follower和Leader的数据库同步。dom

Commit方法提交文件缓存到磁盘,确保事务真实写到磁盘而不是仅仅存在于文件内存缓存中。性能

二、实现类FileTxnLogthis

ZooKeeper中实现TxnLog接口的类是FileTxnLog,它的主要功能和方法包括如下这些。日志

2.1 append方法code

主要代码:接口

logFileWrite = new File(logDir, ("log." + Long.toHexString(hdr.getZxid())));
fos = new FileOutputStream(logFileWrite);
logStream=new BufferedOutputStream(fos);
oa = BinaryOutputArchive.getArchive(logStream);
FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
fhdr.serialize(oa, "fileheader");
logStream.flush();
currentSize = fos.getChannel().position();
streamsToFlush.add(fos);
padFile(fos);
Checksum crc = makeChecksumAlgorithm();
crc.update(buf, 0, buf.length);
oa.writeLong(crc.getValue(), "txnEntryCRC");
Util.writeTxnBytes(oa, buf);

该方法将事务添加到日志文件的尾部。

日志没滚动前,写到当前日志文件;日志回滚的话写到新的日志文件,新日志文件的名称是”log.”加当前zxid的值。

2.2 rollLog方法

日志滚动,关闭旧的日志文件,启动新的日志文件,主要代码:

if (logStream != null) {
    this.logStream.flush();
    this.logStream = null;
}

2.3 getLastLoggedZxid方法

从日志文件中获取最近记录的zxid的值,从lastLoggedZxid就能得到最新的事务日志文件。多个日志文件的状况会遍历全部文件,选出文件名中zxid最大的那个日志文件,再从该日志文件中取到最大的zxid。

主要代码:

public long getLastLoggedZxid() {
    File[] files = getLogFiles(logDir.listFiles(), 0);
    long zxid = maxLog;
    TxnIterator itr = null;
    FileTxnLog txn = new FileTxnLog(logDir);
    itr = txn.read(maxLog);
     while (true) {
        if(!itr.next())
             break;
        TxnHeader hdr = itr.getHeader();
        zxid = hdr.getZxid();
     }     
     return zxid;
 }

 

2.4 truncate方法

截断多余的日志信息,保证日志文件是合法有效的。

主要代码:

public boolean truncate(long zxid) throws IOException {
    FileTxnIterator itr = null;
    itr = new FileTxnIterator(this.logDir, zxid);
    PositionInputStream input = itr.inputStream;
    long pos = input.getPosition();
    RandomAccessFile raf=new RandomAccessFile(itr.logFile,"rw");
    raf.setLength(pos);
    raf.close();
    return true;
 }

 

2.5 commit方法

确认提交日志文件缓存,主要代码:

public synchronized void commit() throws IOException {
   for (FileOutputStream log : streamsToFlush) {
       log.flush();
       if (forceSync) {
          long startSyncNS = System.nanoTime();
          log.getChannel().force(false);          
      }
  }
}
相关文章
相关标签/搜索