Kafka代码走读-LogManager

https://github.com/haogrgr/haogrgr-test/blob/master/logs/kafka_source.txtjava

源码阅读(0.8.2.2):git

(一)概览github

1.调用kafka.Kafka中的main方法启动json

2.经过启动参数获取配置文件的路径api

3.经过System.getProperty(log4j.configuration)来获取日志配置并发

4.加载配置文件, 校验配置app

5.根据配置启动指标导出任务
    KafkaMetricsReporter.startReporters(serverConfig.props)
    根据配置kafka.metrics.reporters, kafka.metrics.polling.interval.secs, 来初始化指标报告类, 
    内部是使用的com.yammer.metrics来作指标收集(最新的版本更名为了io.dropwizard.metrics),
    Kafka提供一个内部实现kafka.metrics.KafkaCSVMetricsReporter, 内部又实用CsvReporter(com.yammer.metrics提供), 将指标信息写出到指定目录下的csv文件, 顺便注册个MBean.dom

6.调用内部的启动类
    val kafkaServerStartable = new KafkaServerStartable(serverConfig)
    kafkaServerStartable.startup异步

7.注册shutdown hook, 等待shutdown信号(阻塞), 调用shutdown方法关闭.fetch

(二)KafkaServerStartable.startup启动逻辑

1.设置broker状态为Starting, 初始话shutdown信号(CountDownLatch)和状态(isShuttingDown)

2.kafkaScheduler.startup()
    启动kafka调度任务线程池, 方法内部为初始化ScheduledThreadPoolExecutor, 其实就是ScheduledThreadPoolExecutor的一个包装, 注意这里的线程是使用daemon类型

3.zkClient = initZk()初始化zkClient连接
    这里能够设置 `zookeeper.connect=localhost:2181/kafka` 这样的url, kafka会将kafka相关的zk路径创建在/kafka下, 方便一个zk注册多个kafka集群.

4.初始化LogManager, logManager = KafkaServer.createLogManager(zkClient, brokerState){
    首先, 根据配置文件中的配置建立LogConfig对象, 
    而后, 从zk上面获取全部topic的配置, 合并配置(zk上的配置覆盖文件中的配置)
    AdminUtils.fetchAllTopicConfigs(zkClient).mapValues(LogConfig.fromProps(defaultProps, _))
    从/brokers/topics下获取全部的topic, 而后循环/config/topics/xxx获取xxx的配置(json中config属性)
    根据配置建立CleanerConfig对象
    
    而后建立LogManager实例, new LogManager(){
        private val logs = new Pool[TopicAndPartition, Log]()
        初始化Log池Pool[并发map的包装]
        
        createAndValidateLogDirs(logDirs)
        建立日志目录
        
        private val dirLocks = lockLogDirs(logDirs)
        初始化文件锁FileLock,内部JDK的文件锁
        
        初始化OffsetCheckpoint对象(saves out a map of topic/partition=>offsets to a file)
        recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap
        OffsetCheckpoint初始化会删除对应的.tmp文件, 并建立对应的文件(若是不存在).
        OffsetCheckpoint文件格式为:
            第一行: 文件格式版本号, 目前是只有一个版本(0).
            第二行: 文件包含的记录数.
            其余行: 具体的快照信息, 格式为, `topic partition offset`, 如  haogrgr 0 100.
        OffsetCheckpoint写是先写.tmp文件, 而后再rename操做, 最后刷盘(writer.flush();fileOutputStream.getFD().sync()).
        
        loadLogs(){
            具体见下面的2.4.1.加载日志 loadLogs().
        }
    }
}

4.1.加载日志 loadLogs() {
    val threadPools = mutable.ArrayBuffer.empty[ExecutorService]
    首先, 为每一个日志dir建立一个线程池来异步执行初始化任务
    
    遍历所有的dir列表{
        首先, 为每一个日志dir, 关联一个线程池(线程数num.recovery.threads.per.data.dir), 用来初始化Log实例, 方法执行完毕即关闭
        
        而后, 经过日志目录下的CleanShutdownFile文件来判断是否为正常关闭, 正常关闭的时候(LogManager.shutdown方法里面), 会建立该文件, 表示正常关闭, 
        非正常关闭, 将状态设置为RecoveringFromUncleanShutdown
        (大概看了下, 后续的Log.loadSegments会检查CleanShutdownFile, 而后初始化完成后进行Log.recoverLog操做, 细节TODO)
        
        val recoveryPoints = this.recoveryPointCheckpoints(dir).read
        recoveryPoints是一个map[topic_partition, offset], kafka在正常关闭, 或定时任务, 或者清理日志的时候(细节TODO), 会将当前每一个分区的最新的offset写到快照文件中,
        这里读取文件, 获取每一个分区的快照信息(offset), recoveryPoint在Log对象中, 保存的是已经flush的最大的offset值, 在log.flush中, 刷盘后会更新该值, 即小于等于recoveryPoint的消息都是落盘了的.
        主要做用是: 减小恢复时日志的扫描数量; 经过(logEndOffset - recoveryPoint)能够获得未刷盘消息数, 作刷盘控制;
        
        对与日志dir下的每一个目录(topic-partition目录)建立初始化Log对象的任务 Utils.runnable {
            val topicPartition = Log.parseTopicPartitionName(logDir.getName)
            首先经过目录名解析出来topic和partition
            
            而后, 获取topic配置类(根据前面2,4中zk上的配置和默认配置合并), 同时获取 recoveryPoint值
            
            val current = new Log(logDir, config, logRecoveryPoint, scheduler, time) {
                建立Log对象实例
                
                private val segments = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
                Log对象属性, 用来存放segment对象, LogSegment表示分区下的日志文件及其对应的索引文件.
                
                loadSegments() {
                    初始化分区下的全部LogSegment对象
                    
                    首先建立日志目录(若是不存在)
                    
                    而后遍历日志目录下全部文件 {
                         删除全部以[.deleted]结尾的文件(log和index), 何时会产生该后缀的文件?
                          a)根据配置, kafka会删除一些旧日志(LogSegment)(retentionMs, retentionSize), 定时任务LogManager.cleanupLogs;
                          b)日志恢复操做, Log.recoverLog, 当非正常关闭kafka时, 会恢复日志, 一旦发现不正常的日志, 这个offset(含)以后的字节和LogSegment都会被删除;
                          c)主从同步时, 当从落后太多(从的最大offset小于主的最小offset(可能日志会被清理了)), 则从会logManager.truncateFullyAndStartAt, 来删除老的日志, 重新的offset开始;
                          d)主从同步时, 因为分区Leader的变化, 以前和旧Leader同步的数据可能不是最新的, 须要删除highWatermark(offset)(TODO)以后的数据, 防止不一致, ReplicaManager.makeFollowers;
                         删除步骤为:
                          a)先讲要删除的LogSegment从log.segments中移除;
                          b)再重命名日志和索引文件名后缀为.deleted;
                          c)最后提交异步任务, 任务中再删除日志和对应的索引文件.
                        
                        删除全部以[.cleaned]结尾的文件(log和index), 何时会产生该后缀的文件?
                          a)Cleaner.clean中, 会将多个Segment清理成一个Segment, 而后交换到Log.segments中(清理:同key的消息, 去最后的value), 交换过程当中, 先是将多个Segment中的日志(合并同key消息)写入到.cleaned文件中, 写完后, 重命名为.swap文件, 而后删除老Segment文件, 最后去掉.swap后缀;
                        能够看到, 交换步骤为分为三步, 第一步先写.cleaned文件, 保证文件所有清理完后再操做, 而后重命名为.swap文件, 这时能够删除老的文件了, 删除操做参考上面的.deleted文件操做, 最后重命名, 去掉.swap后缀, 中间任何一步异常, 都不会破坏文件完整性.
                        一个疑惑: 当重命名为.swap成功, 可是立刻carsh了, 致使老的log没有移除, 那么下次启动时, 老的日志依然存在, 如何处理(猜想: 由于是clean, 因此只会clean达到调节的log, 下次启动会继续clean操做, 待验证TODO)
                        
                        处理.swap文件, 如上面说的, 当swap操做进行到一半而挂掉了, 就可能会有.swap文件, 这里须要完成swap操做, 重命名去掉.swap后缀, 删除索引, 后续会判断相关的log文件是否有对应的index文件, 没有会重建索引文件.
                    }
                    
                    而后再次遍历日志目录下全部文件{
                        首先, 删除没有对应log文件的index文件.
                        
                        而后, 若是为log文件, 则建立LogSegment对象, 若是没有对应的index文件, 则重建LogSegment.recover, 而后将segment放入到log的segments中去, key为文件名(startOffset).
                        
                        重建索引文件 LogSegment.recover {
                            遍历log文件, 每隔指定间隔字节数, 就在索引文件中添加一条索引, 最后设置log和index文件大小为有效的字节数
                        }
                        
                        则建立LogSegment对象 segment = new LogSegment {
                            建立FileMessageSet对象{
                                这里调用的是def this(file: File)这个构造方法, 内部会调用FileMessageSet(file, new RandomAccessFile(file, "rw").getChannel(), 0, Int.MaxValue, false)
                                这里经过RandomAccessFile来获取到对应的FileChannel, 提供相似于切片的功能, 经过维护start, end, isSlice来实现, 提供iterator方式来遍历整个日志文件.
                                消息添加是经过ByteBufferMessageSet.writeTo来从buffer写到文件channel的.
                                这个类主要提供Log文件的读写等操做
                            }
                            
                            建立OffsetIndex对象{
                                建立startOffset.index文件
                                建立对应的RandomAccessFile实例:val raf = new RandomAccessFile(file, "rw")
                                若是老的index文件存在, 即file.createNewFile返回true, 则设置文件长度为小于maxIndexSize(默认1m, 最小为8b), 若是不为8的倍数, 则取最近的8的倍数 :raf.setLength(roundToExactMultiple(maxIndexSize, 8))
                                而后经过raf.getChannel.map来内存映射文件, 获取MappedByteBuffer
                                最后, 设置buffer的position指针, 若是新文件, 就是0, 老文件, 则是, 文件大小, 而后关闭流
                                
                                这个类主要的功能就是维护索引, 先是mmap索引文件, 而索引文件中内容是已8个字节为一个entry, 其中前4个字节为相对offset(原始offset-baseOffset), 后4个字节为日志文件偏移, 
                                查找时采用二分查找, 由于offset在索引文件中是有序的, 同时由于是mmap, 因此查找效率高, 主要用于日志读取时使用(LogSegment.translateOffset)
                                
                                这里并非每一个消息offset都索引, 而是间隔必定大小索引一次(indexIntervalBytes), 因此查找到文件位置后, 还须要再去log中去查找到精确的位置, 具体的判断是在LogSegment中实现的.
                            }
                            
                            LogSegment是log和index的包装, 提供一个统一的api来统一的操做index和log, 屏蔽log和index细节.
                            包含了append, read, flush, delete等方法.
                        }
                    
                    }
                    
                    好了,  通过前面两次的遍历, 已经建立好了LogSegment并都放到Log.segments中去了
                    
                    若是目录是空的, 就建立一个startOffset=0的LogSegment, 加入到Log.segments中去.
                    
                    若是目录不是空的, 就进行Log.recoverLog操做{
                        首先, 若是是正常关闭的(hasCleanShutdownFile), 则没啥好恢复的, 设置recoveryPoint为下一个offset, 结束方法.
                        
                        非正常结束, 须要恢复recoveryPoint(前面2.4.1有讲)以后的LogSegment对应的日志, 经过Log.segments的方法, 获取大于recoveryPoint的记录,
                        遍历须要恢复操做的LogSegment列表, 对每一个LogSegment, 遍历日志文件, 重建索引, 遍历的时候校验消息(computeChecksum等), 一旦某条消息出问题了, 这条消息和它后面的数据都会被删除.
                        同时, 改LogSegment以后的LogSegment也会被删除.
                        
                        // reset the index size of the currently active log segment to allow more entries
                        activeSegment.index.resize(config.maxIndexSize)
                        最后设置当前活动的LogSegment(startOffset最大的segment), 的index文件为config.maxIndexSize, 由于上一步会吧index文件设置为真实大小, 而当前LogSegment还会有add操做, 会致使index写失败.
                        kafka.log.OffsetIndex.append中会校验index是否满了(require(!isFull)).
                    }
                    
                    最后, 一个简单的校验, 校验index文件大小是否是8的倍数.
                    
                }loadSegments()结束
                
            }new Log()结束
            
            建立完Log实例后, 加入到LogManager.logs中(key:TopicPartition, value:Log实例), 若是存在TopicPartition对应两个Log实例, 报错
            
        }Utils.runnable结束
        
        最后, 提交上面的任务(Utils.runnable)到线程池中并行执行, 并收集结果.    
        
    }遍历所有的dir列表, 结束
    
    对每一个log dir, 获取上面的任务的执行结果, 无异常, 则删除目录下面的cleanShutdownFile文件.
    
    最后结束线程池.
    
}loadLogs 关闭


5.日志管理器启动 logManager.startup(){
    
    
}

问题记录:

1.调试过程当中, 碰到了个问题, 启动的时候, 报了NPE(kafka.log.OffsetIndex.forceUnmap), 调试发现, 是由于方法内部调用了sun.nio.ch.DirectBuffer.cleaner().clean(), 而cleaner()方法可能会返回Null, 致使空异常. 调试DirectBuffer, 他的cleaner是在构造方法的时候初始化的, 当OffsetIndex.mmap属性初始化的时候, 会将index文件映射为MappedByteBuffer, 经过sun.nio.ch.FileChannelImpl.map方法, 而当文件大小为0的时候, 并不会建立cleaner实例, 因此致使DirectBuffer.cleaner().clean()出现NPE异常, 可是为何index文件会是空的, 明明已经写入消息了, TODO.  补充一点, sun.misc.Cleaner实现PhantomReference接口, 用来在引用的对象被回收的时, 则就会把对象放到PhantomReference队列中, 应用能够经过队列获取到Reference对象, 以便作些回收的工做, 看Cleaner代码时, 发现, 并无使用PhantomReference队列, 而后查看到java.lang.ref.Reference对象中对Cleaner会优化处理, 当发现为Cleaner类型时, 直接调用Cleaner.clean方法, 其余类型则enqueue.

相关文章
相关标签/搜索