以下是ReportBucket初始化的代码:数据结构
@Override public void initialize(String name, Date timestamp, int index) throws IOException { m_baseDir = m_configManager.getHdfsLocalBaseDir("report"); m_writeLock = new ReentrantLock(); m_readLock = new ReentrantLock(); String logicalPath = m_pathBuilder.getReportPath(name, timestamp, index); File dataFile = new File(m_baseDir, logicalPath); File indexFile = new File(m_baseDir, logicalPath + ".idx"); //加载索引文件 if (indexFile.exists()) { loadIndexes(indexFile); } final File dir = dataFile.getParentFile(); if (!dir.exists() && !dir.mkdirs()) { throw new IOException(String.format("Fail to create directory(%s)!", dir)); } m_logicalPath = logicalPath; m_writeDataFile = new BufferedOutputStream(new FileOutputStream(dataFile, true), 8192); m_writeIndexFile = new BufferedOutputStream(new FileOutputStream(indexFile, true), 8192); m_writeDataFileLength = dataFile.length(); m_readDataFile = new RandomAccessFile(dataFile, "r"); }
以下是loadIndexes的代码:dom
protected void loadIndexes(File indexFile) throws IOException { BufferedReader reader = null; m_writeLock.lock(); try { reader = new BufferedReader(new FileReader(indexFile)); StringSplitter splitter = Splitters.by('\t'); while (true) { String line = reader.readLine(); if (line == null) { // EOF break; } List<String> parts = splitter.split(line); if (parts.size() >= 2) { String id = parts.remove(0); String offset = parts.remove(0); try { m_idToOffsets.put(id, Long.parseLong(offset)); } catch (NumberFormatException e) { // ignore it } } } } finally { m_writeLock.unlock(); if (reader != null) { reader.close(); } } }
根据索引文件,逐行读取,每一行的数据结构是 domain offset,中间是tab间隔,这样就能够将每一个应用名对应的offset位置拿到了。 示例以下:ide
根据上面初始化的bucket,以下直接读取报表ui
String xml = bucket.findById(domain);
具体读取代码以下:code
/** * 从offset位置开始,读第一行,第一行的内容是整个报表的长度 */ @Override public String findById(String id) throws IOException { Long offset = m_idToOffsets.get(id); if (offset != null) { m_readLock.lock(); try { m_readDataFile.seek(offset); int num = Integer.parseInt(m_readDataFile.readLine()); byte[] bytes = new byte[num]; m_readDataFile.readFully(bytes); return new String(bytes, "utf-8"); } catch (Exception e) { m_logger.error(String.format("Error when reading file(%s)!", m_readDataFile), e); } finally { m_readLock.unlock(); } } return null; }
根据应用名称和报表写入报表数据文件和索引文件的代码以下:orm
/** * 根据应用名称存储报表 * 0、把当前前的数据文件长度做为这次应用的offset,写入到bucket维护的m_idToOffsets中 * 一、拿到报表字符串的字节长度 * 二、拿到报表字符串的字节长度的字节长度 * 三、将报表字符串的字节长度写入数据文件,带一个换行符 * 四、将报表字符串写入数据文件,带一个换行符 * 五、把 应用名 + '\t' + offset + '\n' 写入到索引文件 * 五、计算当前的数据文件长度 */ @Override public boolean storeById(String id, String report) throws IOException { byte[] content = report.getBytes("utf-8"); int length = content.length; byte[] num = String.valueOf(length).getBytes("utf-8"); m_writeLock.lock(); try { m_writeDataFile.write(num); m_writeDataFile.write('\n'); m_writeDataFile.write(content); m_writeDataFile.write('\n'); m_writeDataFile.flush(); long offset = m_writeDataFileLength; String line = id + '\t' + offset + '\n'; byte[] data = line.getBytes("utf-8"); m_writeDataFileLength += num.length + 1 + length + 1; m_writeIndexFile.write(data); m_writeIndexFile.flush(); m_idToOffsets.put(id, offset); return true; } finally { m_writeLock.unlock(); } }