最近查看Kafka文档, 发现 Kafka 有个 Log Compaction 功能是咱们以前没有留意到的, 可是有着很高的潜在实用价值.git
Kafka 中的每一条数据都有一对 Key 和 Value, 数据存放在磁盘上, 通常不会被永久保留, 而是在到达必定的量或者时间后对最先写入的数据进行删除. Log Compaction 在默认的删除规则以外提供了另外一种删除过期数据(或者说保留有价值的数据)的方式, 就是对于有相同 Key 的不一样数据, 只保留最后一条, 前面的数据在合适的状况下删除.github
Log Compaction 特性, 就实时计算而言, 能够在灾难恢复方面有很好地应用场景. 好比说咱们在 Storm 里作计算时, 须要长期在内存里维护一些数据, 这些数据多是经过聚合了一天或者一周的日志获得的, 这些数据一旦因为偶然的缘由(磁盘,网络等)崩溃了, 从头开始计算须要漫长的时间.一个可行的应对方法是定时将内存里的数据备份到外部存储中, 好比 Redis 或者 Mysql 等, 当崩溃发生的时候再从外部存储读回来继续计算.算法
使用 Log Compaction 来代替这些外部存储有如下好处.sql
Kafka 既是数据源又是存储工具, 能够简化技术栈, 下降维护成本.apache
使用 Mysql 或者 Redis 做为外部存储的话, 须要将存储的 Key 记录下来, 恢复时再用这些 Key 将数据取回, 实现起来有必定的工程复杂度. 用Log Compaction 特性的话只要把数据一古脑儿地写进 Kafka, 等灾难恢复的时候再读回内存就好了.网络
Kafka 针对磁盘读写都有很高的顺序性, 相对于 Mysql 没有索引查询等工做量的负担, 能够实现高性能, 相对于 Redis 而言, 它能够充分利用廉价的磁盘而对内存要求很低, 在接近的性能下能实现很是高的性价比(仅仅针对灾难恢复这个场景而言).ide
当 topic 的 cleanup.policy (默认为delete) 设置为 compact 时, Kafka 的后台线程会定时把 topic 遍历两次, 第一次把每一个 key 的哈希值最后一次出现的 offset 都存下来, 第二次检查每一个 offset 对应的 key 是否在更后面的日志中出现过,若是出现了就删除对应的日志.工具
Log Compaction 的大部分功能由CleanerThread完成, 核心逻辑在 Cleaner 的 clean方法性能
/** * Clean the given log * * @param cleanable The log to be cleaned * * @return The first offset not cleaned and the statistics for this round of cleaning */ private[log] def clean(cleanable: LogToClean): (Long, CleanerStats) = { val stats = new CleanerStats() info("Beginning cleaning of log %s.".format(cleanable.log.name)) val log = cleanable.log // build the offset map info("Building offset map for %s...".format(cleanable.log.name)) val upperBoundOffset = cleanable.firstUncleanableOffset buildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, offsetMap, stats) // <----- 这里第一次遍历全部offset将key索引 val endOffset = offsetMap.latestOffset + 1 stats.indexDone() // figure out the timestamp below which it is safe to remove delete tombstones // this position is defined to be a configurable time beneath the last modified time of the last clean segment val deleteHorizonMs = log.logSegments(0, cleanable.firstDirtyOffset).lastOption match { case None => 0L case Some(seg) => seg.lastModified - log.config.deleteRetentionMs } // determine the timestamp up to which the log will be cleaned // this is the lower of the last active segment and the compaction lag val cleanableHorizonMs = log.logSegments(0, cleanable.firstUncleanableOffset).lastOption.map(_.lastModified).getOrElse(0L) // group the segments and clean the groups info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(deleteHorizonMs))) for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize)) cleanSegments(log, group, offsetMap, deleteHorizonMs, stats) // <-- 这里第二次遍历全部offset,删除冗余的日志,而且将多个小的segment合并为一个 // record buffer utilization stats.bufferUtilization = offsetMap.utilization stats.allDone() (endOffset, stats) }
log compaction 经过两次遍历全部数据来实现, 两次遍历之间交流的媒介就是一个
OffsetMap, 下面是OffsetMap的签名ui
trait OffsetMap { def slots: Int def put(key: ByteBuffer, offset: Long) def get(key: ByteBuffer): Long def clear() def size: Int def utilization: Double = size.toDouble / slots def latestOffset: Long }
这基本就是个普通的mutable map, 在 Kafka 项目中,它的实现只有一个, 叫作SkimpyOffsetMap
put 方法会为每一个 key 生成一份摘要,默认使用 md5 方法生成一个 16byte 的摘要, 根据这个摘要在 bytes
中哈希的到一个下标, 若是这个下标已经被别的摘要占据, 则线性查找到下个空余的下标为止, 而后在对应位置插入该 key 对应的 offset
/** * Associate this offset to the given key. * @param key The key * @param offset The offset */ override def put(key: ByteBuffer, offset: Long) { require(entries < slots, "Attempt to add a new entry to a full offset map.") lookups += 1 hashInto(key, hash1) // probe until we find the first empty slot var attempt = 0 var pos = positionOf(hash1, attempt) while(!isEmpty(pos)) { bytes.position(pos) bytes.get(hash2) if(Arrays.equals(hash1, hash2)) { // we found an existing entry, overwrite it and return (size does not change) bytes.putLong(offset) lastOffset = offset return } attempt += 1 pos = positionOf(hash1, attempt) } // found an empty slot, update it--size grows by 1 bytes.position(pos) bytes.put(hash1) bytes.putLong(offset) lastOffset = offset entries += 1 }
get 方法使用和 put 一样的摘要算法得到 key 的摘要, 经过摘要得到 offset 的存储位置
/** * Get the offset associated with this key. * @param key The key * @return The offset associated with this key or -1 if the key is not found */ override def get(key: ByteBuffer): Long = { lookups += 1 hashInto(key, hash1) // search for the hash of this key by repeated probing until we find the hash we are looking for or we find an empty slot var attempt = 0 var pos = 0 //we need to guard against attempt integer overflow if the map is full //limit attempt to number of slots once positionOf(..) enters linear search mode val maxAttempts = slots + hashSize - 4 do { if(attempt >= maxAttempts) return -1L pos = positionOf(hash1, attempt) bytes.position(pos) if(isEmpty(pos)) return -1L bytes.get(hash2) attempt += 1 } while(!Arrays.equals(hash1, hash2)) bytes.getLong() }
默认状况下, Kafka 用 16 个 byte 存放key的摘要, 用 8 个 byte 存放摘要对应的 offset, 1GB 的空间能够保存 1024* 1024*1024 / 24 = 44,739,242.666...
个 key 对应的数据.
这个 log compaction 的原理挺简单, 就是按期把全部日志读两遍,写一遍, cpu 的速度超过磁盘彻底不是问题, 只要日志的量对应的读两遍写一遍的时间在可接受的范围内, 它的性能就是能够接受的.
如今的 OffsetMap 惟一的实现名字叫作 SkimpyOffsetMap, 相信大家已经从这个名字里看出端倪, 最初的做者自己也认为这样的实现不够严谨. 这个算法在两个 key 的 md5 值相同的状况下就判断 key 是相同的, 若是遇到了 key 不一样而 md5 值相同的状况, 那两个 key 中其中一个的消息就丢失了. 虽然 md5 值相同的几率很低, 但若是真的碰上了, 那就是100%, 几率值再低也没用, 并且从网上的反映看彷佛冲突还很多见.
我我的目前想到的处理方案是, 大部分的 key 总长度并不算长, 能够把这类 key 全部可能的状况都md5一遍看一下是否有冲突, 若是没有的话就放心用.