0.10以前的版本分为高水平API和低水平API,到了0.10彻底使用java写了客户端源码,以前是用的scala,而且再也不依赖zook和scala。java
新版本的 Consumer 综合统一了以前“High Level”和“Simple”的接口,支持订阅固定的TopicPartition,手动Commit的Offset等。安全
1、消息的消费流程:less
- KafkaConsumer:
- poll 方法在有限定的时间内不断调用 pollOnce 方法,一旦有了可消费的消息则返回,返回前会预先调用 Fetcher 的 fetchedRecords 请求消息,或者超时返回空结果集;
- pollOnce 方法:
- 调用 ConsumerCoordinator 的 ensureCoordinatorReady 方法,检查获取当前 Consumer Group 的 coordinator (负责协调的 Broker) ;
- 调用 ConsumerCoordinator 的 ensurePartitionAssignment 的方法,得到指派给当前 Consumer 的 Partition;
- 若是不清楚 Partition 目前消费到的 position,例如刚加入 Group 时,调用 ConsumerCoordinator 的 refreshCommittedOffsetsIfNeeded 的方法,对指派到的 Partition 的 position 进行更新;
- 调用 ConsumerNetworkClient 的 executeDelayedTasks 方法,执行 auto commit、heartbeat 等后台任务;
- 调用 Fetcher 的 fetchedRecords 方法,返回暂存着的、可直接拿来消费的消息。没有的话调用 Fetcher 的 sendFetches 方法,发送 FetchRequest 给每一个 Partition 对应的 Leader Broker 节点,得到新的供消费的消息;
- Fetcher:
- 使用 records (List<PartitionRecords<K, V>>) 暂存着从 Broker Leader 拉取来的、能够直接消费的新消息;
- fetchedRecords 方法中从 records 中取出暂存的消息,转化成 Map<TopicPartition, List<ConsumerRecord<K, V>>> 的结构,并调用 SubscriptionState 的 position 方法,维护各个 Partition 目前消费到的 posision;
- sendFetches 方法中对于每一个 fetchable 的 Partition,带上这个 Partition 目前消费到的 position,构形成 FetchRequest,发送到对应的 Leader Broker。对 FetchResponse 的处理中,将每一个 Partition 待消费的消息放到 records 中;
- ConsumerCoordinator:
- 初始化的时候,若是 auto commit = true,则产生一个 AutoCommitTask,该任务将经过 ConsumerNetworkClient 将 SubscriptionState 的 assignment 内的各个Partition 消费的 Offset 提交;
- ensureCoordinatorReady 方法中,选取其中一个低负载的 Broker 咨询当前 Consumer Group 的 coordinator,获取后调用 NetworkClient 的 ready 方法与 coordinator 创建链接;
- ensurePartitionAssignment 方法中,若是刚加入当前 Group 或者 HeartBeat 收到 rebalance 的 response,调用 ConsumerNetworkClient 的 send 方法,向 coordinator 发送 JoinGroupRequest。在 JoinGroupResponseHandler 中若是被 coordinator 指派为 PartitionAssignor,则进行 Partition 的分配,分为 Range 和 RoundRobin 两种,经过 ConsumerNetworkClient 的 send 方法发送 SyncGroupRequest 并将结果上报给 coordinator。从 coordinator 那收到分配结果后,更新 SubscriptionState 内的 assignment;
- refreshCommittedOffsetsIfNeeded 方法中,经过 ConsumerNetworkClient 的 send 方法发送 OffsetFetchRequest,获取 Partition 的 position,并更新 SubscriptionState 内的 assignment;
- SubscriptionState:
- 使用 assignment (Map<TopicPartition, TopicPartitionState>) 维护目前被指派消费的 Partition,以及各个 Partition 消费到的 position;
- ConsumerNetworkClient:
- send 方法中将发往指定 Broker 的各种请求,暂存到 unsent ( Map<Node, List<ClientRequest>> )中,不进行实际发送,直到 poll 方法被调用,才真正经过 NetworkClient 发送请求
- executeDelayedTasks 方法中,同步执行 delayedTasks 内到时的任务,auto commit、heartbeat 等;
- NetworkClient
- 使用 ClusterConnectionStates (Map<String, NodeConnectionState>) 维护着每一个 Broker 节点的链接状态;
- ready 方法中判断是否跟指定的 Broker 节点是 connected 的状态,否的话会经过 Selector 的 connect 方法初始化跟其的链接,创建 SocketChannel 并 register,KafkaChannel 会 attach 在SelectionKey 上;
- poll 方法中调用 Selector 的 poll 方法,处理 Selector 内的 completedSends,completedReceives等,处理 ClientResponse, 遍历 RecordBatch 内的List<Thunk>,完成回调逻辑的处理;
2、KafkaConsumer 线程不安全的问题
官方建议的两种解决方法:
1. 每一个消费一个线程,而且给每一个消费者建立一个实例。
优势:(1)很是容易实现; (2) 很是快,不须要 inter-thread线程协调(3) 很是容易实如今每一个分区顺序处理数据。
缺点: (1) 消费者越多 TCP链接越多 (2) Multiple consumers means more requests being sent to the server and slightly less batching of data which can cause some drop in I/O throughput; (3) The number of total threads across all processes will be limited by the total number of partitions.fetch
2. Decouple Consumption and Processing: have one consumer fetch message from broker and have a pool of processor threads that actually handle the message processing.
优势: (1) This option allows independently scaling the number of consumers and processors, avoiding any limitation on partitions.
缺点: (1) Guaranteeing order across the processors requires particular care as the threads will execute independently an earlier chunk of data may actually be processed after a later chunk of data; (2) Manually committing the position becomes harder as it requires that all threads co-ordinate to ensure that processing is complete for that partition. ui
3、总结: Kafka 的 Consumer 经过 Fetcher 从 Broker 获取能够消费的消息,SubscriptionState 内的 assignment 维护了目前各个 TopicPartition 的消费 position,周而复始运行的 AutoCommitTask 将目前消费的 offset 提交给 Broker。经过 HeartBeat 得知须要 rebalance 的时候,向 Coordinator 发送 JoinGroupRequest,若被选为 Partition Assignor,则将进行 Partition 的分配,经过 SyncGroupRequest 上报结果。最终从 Coordinator 那收到的分配更新进 SubscriptionState 内的 assignment。