目录java
上图是以CommitLog文件为例,展现了commitlog文件与MappedFile、MapppedFileQueue的关系。
你能够把磁盘里面commitlog文件夹下每一个文件对应成MappedFile,而这个文件夹对应成MappedFileQueue。缓存
先从MappedFileQueue看起app
private final String storePath;//存储目录 private final int mappedFileSize;//一个存储文件的大小 private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();//写时复制MappedFile列表,按顺序存储各个commitlog对于的MappedFile private final AllocateMappedFileService allocateMappedFileService;//分配MappedFile服务 private long flushedWhere = 0;//当前刷盘指针的位置,表示以前的数据已经刷到磁盘 private long committedWhere = 0;//当前数据提交指针
/* 此处获得的第一个MappedFile的文件起始offset(即文件名)不必定是0, 以前的文件有可能已经被清除了。 */ MappedFile mappedFile = this.getFirstMappedFile(); if (mappedFile != null) { /* 以前的文件有可能已经被清除了(从this.mappedFiles里也会删掉)。所以不能直接用offset / this.mappedFileSize 计算offset对应的文件索引。 */ int index = (int) ((offset / this.mappedFileSize) - (mappedFile.getFileFromOffset() / this.mappedFileSize)); if (index < 0 || index >= this.mappedFiles.size()) { LOG_ERROR.warn("Offset for {} not matched. Request offset: {}, index: {}, " + "mappedFileSize: {}, mappedFiles count: {}", mappedFile, offset, index, this.mappedFileSize, this.mappedFiles.size()); } try { return this.mappedFiles.get(index); } catch (Exception e) { if (returnFirstOnNotFound) { return mappedFile; } LOG_ERROR.warn("findMappedFileByOffset failure. ", e); } }
public static final int OS_PAGE_SIZE = 1024 * 4;//系统页缓存大小 protected static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);//当前JVM实例中MappedFile虚拟内存 private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);//当前JVM实例中MappedFile对象个数 protected final AtomicInteger wrotePosition = new AtomicInteger(0);//当前文件的写指针 //ADD BY ChenYang protected final AtomicInteger committedPosition = new AtomicInteger(0);//当前文件的提交指针 private final AtomicInteger flushedPosition = new AtomicInteger(0);//刷写磁盘的指针 protected int fileSize;//文件大小 protected FileChannel fileChannel;//文件通道 /** * Message will put to here first, and then reput to FileChannel if writeBuffer is not null. */ protected ByteBuffer writeBuffer = null;//该buffer从transientStorePool申请,消息首先会放到这里,而后再提交到FileChannel protected TransientStorePool transientStorePool = null;//堆外内存池,与上面的writeBuffer共同起做用 private String fileName;//文件名称 private long fileFromOffset;//文件起始物理偏移地址 private File file;//文件自己 private MappedByteBuffer mappedByteBuffer;//FileChannel对应的内存映射 private volatile long storeTimestamp = 0;//文件最后一次内容写入时间 private boolean firstCreateInQueue = false;//是不是MappedFileQueue中的第一个文件
this.fileName = fileName; this.fileSize = fileSize; this.file = new File(fileName); this.fileFromOffset = Long.parseLong(this.file.getName());//文件名即为文件的起始物理偏移量 boolean ok = false; ensureDirOK(this.file.getParent());//确保文件的父目录存在 try { this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();//建立文件读写通道 this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);//得到映射buffer TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);//增长 TOTAL_MAPPED_FILES.incrementAndGet();//增长 ok = true; }
if (writeBuffer == null) { //no need to commit data to file channel, so just regard wrotePosition as committedPosition. //没开启堆外内存池,不须要提交数据到fileChannel return this.wrotePosition.get();//返回当前写的位置 } if (this.isAbleToCommit(commitLeastPages)) { if (this.hold()) {//??? commit0(commitLeastPages); this.release();//??? } else { log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get()); } } // All dirty data has been committed to FileChannel. //全部数据已经被提交到了FileChannel if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) { this.transientStorePool.returnBuffer(writeBuffer);//将writeBuffer归还给transientStorePool this.writeBuffer = null; } return this.committedPosition.get();//返回当前提交的位置
int flush = this.committedPosition.get(); int write = this.wrotePosition.get(); if (this.isFull()) { return true;//当前文件已经写满;,能够提交 } if (commitLeastPages > 0) { //本次要提交的数据页大于等于容许提交的最小阈值,能够提交 return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages; } return write > flush;//当前写的位置大于提交的位置,能够提交
int writePos = this.wrotePosition.get(); int lastCommittedPosition = this.committedPosition.get(); if (writePos - this.committedPosition.get() > 0) { try { ByteBuffer byteBuffer = writeBuffer.slice();//建立一个buffer,与writeBuffer指向同一缓存区 byteBuffer.position(lastCommittedPosition);//回退buffer当前指针位置为lastCommittedPosition byteBuffer.limit(writePos);//设置当前最大有效数据指针 this.fileChannel.position(lastCommittedPosition); this.fileChannel.write(byteBuffer); this.committedPosition.set(writePos); } catch (Throwable e) { log.error("Error occurred when commit data to FileChannel.", e); } }