Kafka工做流程 - 学习总结

 

1、为何须要消息系统
1)解耦
容许你独立的扩展或修改两边的处理过程,只要确保它们遵照一样的接口约束。
2)冗余
消息队列把数据进行持久化直到它们已经被彻底处理,经过这一方式规避了数据丢失风险。许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除以前,须要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。
3)扩展性
由于消息队列解耦了你的处理过程,因此增大消息入队和处理的频率是很容易的,只要另外增长处理过程便可。
4)灵活性 & 峰值处理能力
在访问量剧增的状况下,应用仍然须要继续发挥做用,可是这样的突发流量并不常见。若是为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列可以使关键组件顶住突发的访问压力,而不会由于突发的超负荷的请求而彻底崩溃。
5)可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列下降了进程间的耦合度,因此即便一个处理消息的进程挂掉,加入队列中的消息仍然能够在系统恢复后被处理。
6)顺序保证
在大多使用场景下,数据处理的顺序都很重要。大部分消息队列原本就是排序的,而且能保证数据会按照特定的顺序来处理。(Kafka 保证一个 Partition 内的消息的有序性)
7)缓冲
有助于控制和优化数据流通过系统的速度,解决生产消息和消费消息的处理速度不一致的状况。
8)异步通讯
不少时候,用户不想也不须要当即处理消息。消息队列提供了异步处理机制,容许用户把一个消息放入队列,但并不当即处理它。想向队列中放入多少消息就放多少,而后在须要的时候再去处理它们。java

2、Kafka系统架构
Apache Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,以后成为Apache项目的一部分。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。算法

2.1   Kafka拓扑结构安全

2.2   Kafka相关概念bash

1)producer(生产者):
消息生产者,发布消息到 kafka 集群的终端或服务。producer是可以发布消息到话题的任何对象。
2)broker(服务代理):
broker是已发布的消息保存在一组服务器中,它们被称为代理(Broker)或Kafka集群。broker是kafka 集群中包含的服务器。
3)topic(话题):
topic是特定类型的消息流。消息是字节的有效负载(Payload),话题是消息的分类名或种子(Feed)名。每条发布到 kafka 集群的消息属于的类别,即 kafka 是面向 topic 的。
4)consumer(消费者):
consumer是从kafka集群中消费消息的终端或服务。能够订阅一个或多个话题,并从Broker拉数据,从而消费这些已发布的消息。
5)partition:
partition 是物理上的概念,每一个 topic 包含一个或多个 partition。kafka 分配的单位是 partition。
6)Consumer group:
high-level consumer API 中,每一个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但能够被多个 consumer group 消费。
7)replica:
partition 的副本,保障 partition 的高可用。
8)leader:
replica 中的一个角色, producer 和 consumer 只跟 leader 交互。
9)follower:
replica 中的一个角色,从 leader 中复制数据。
10)controller:
kafka 集群中的其中一个服务器,用来进行 leader election 以及 各类 failover。
11)zookeeper:
kafka 经过 zookeeper 来存储集群的 meta 信息。服务器

==Kafka存储策略==
1)kafka以topic来进行消息管理,每一个topic包含多个partition,每一个partition对应一个逻辑log,有多个segment组成。
2)每一个segment中存储多条消息(见下图),消息id由其逻辑位置决定,即从消息id可直接定位到消息的存储位置,避免id到位置的额外映射。
3)每一个part在内存中对应一个index,记录每一个segment中的第一条消息偏移。
4)发布者发到某个topic的消息会被均匀的分布到多个partition上(或根据用户指定的路由规则进行分布),broker收到发布消息往对应partition的最后一个segment上添加该消息,当某个segment上的消息条数达到配置值或消息发布时间超过阈值时,segment上的消息会被flush到磁盘,只有flush到磁盘上的消息订阅者才能订阅到,segment达到必定的大小后将不会再往该segment写数据,broker会建立新的segment。网络

==Kafka数据保留策略==
1)N天前的删除。
2)保留最近的多少Size数据。多线程

==Kafka broker==
与其它消息系统不一样,Kafka broker是无状态的。这意味着消费者必须维护已消费的状态信息。这些信息由消费者本身维护,broker彻底无论(有offset managerbroker管理)。
    -  从代理删除消息变得很棘手,由于代理并不知道消费者是否已经使用了该消息。Kafka创新性地解决了这个问题,它将一个简单的基于时间的SLA应用于保留策略。当消息在代理中超过必定时间后,将会被自动删除。
   -  这种创新设计有很大的好处,消费者能够故意倒回到老的偏移量再次消费数据。这违反了队列的常见约定,但被证实是许多消费者的基本特征。架构

==Kafka Design==
目标
1) 高吞吐量来支持高容量的事件流处理
2) 支持从离线系统加载数据
3) 低延迟的消息系统app

持久化
1) 依赖文件系统,持久化到本地
2) 数据持久化到log负载均衡

效率
1) 解决”small IO problem“:
    使用”message set“组合消息。
    server使用”chunks of messages“写到log。
    consumer一次获取大的消息块。
2)解决”byte copying“:
    在producer、broker和consumer之间使用统一的binary message format。
    使用系统的page cache。
    使用sendfile传输log,避免拷贝。

端到端的批量压缩(End-to-end Batch Compression)
Kafka支持GZIP和Snappy压缩协议。

==复制(Replication)==
1)一个partition的复制个数(replication factor)包括这个partition的leader自己。
2)全部对partition的读和写都经过leader。
3)Followers经过pull获取leader上log(message和offset)
4)若是一个follower挂掉、卡住或者同步太慢,leader会把这个follower从"in sync replicas"(ISR)列表中删除。
5)当全部的"in sync replicas"的follower把一个消息写入到本身的log中时,这个消息才被认为是"committed"的。
6)若是针对某个partition的全部复制节点都挂了,Kafka默认选择最早复活的那个节点做为leader(这个节点不必定在ISR里)。

==Leader选举==
Kafka在Zookeeper中为每个partition动态的维护了一个ISR,这个ISR里的全部replica都跟上了leader,只有ISR里的成员才能有被选为leader的可能(unclean.leader.election.enable=false)。
在这种模式下,对于f+1个副本,一个Kafka topic能在保证不丢失已经commit消息的前提下容忍f个副本的失败,在大多数使用场景下,这种模式是十分有利的。事实上,为了容忍f个副本的失败,"少数服从多数"的方式和ISR在commit前须要等待的副本的数量是同样的,可是ISR须要的总的副本的个数几乎是"少数服从多数"的方式的一半。

==The Producer==
发送确认
经过request.required.acks来设置,选择是否等待消息commit(是否等待全部的”in sync replicas“都成功复制了数据)
Producer能够经过acks参数指定最少须要多少个Replica确认收到该消息才视为该消息发送成功。acks的默认值是1,即Leader收到该消息后当即告诉Producer收到该消息,此时若是在ISR中的消息复制完该消息前Leader宕机,那该条消息会丢失。
推荐的作法是,将acks设置为all或者-1,此时只有ISR中的全部Replica都收到该数据(也即该消息被Commit),Leader才会告诉Producer该消息发送成功,从而保证不会有未知的数据丢失。

负载均衡
1)producer能够自定义发送到哪一个partition的路由规则。默认路由规则:hash(key)%numPartitions,若是key为null则随机选择一个partition。
2)自定义路由:若是key是一个user id,能够把同一个user的消息发送到同一个partition,这时consumer就能够从同一个partition读取同一个user的消息。

异步批量发送
批量发送:配置很少于固定消息数目一块儿发送而且等待时间小于一个固定延迟的数据。

==The Consumer==
consumer控制消息的读取。

Push vs Pull
1) producer push data to broker,consumer pull data from broker
2) consumer pull的优势:consumer本身控制消息的读取速度和数量。
3) consumer pull的缺点:若是broker没有数据,则可能要pull屡次忙等待,Kafka能够配置consumer long pull一直等到有数据。

Consumer Position
1) 大部分消息系统由broker记录哪些消息被消费了,但Kafka不是。
2) Kafka由consumer控制消息的消费,consumer甚至能够回到一个old offset的位置再次消费消息。

==Consumer group==
每个consumer实例都属于一个consumer group。
每一条消息只会被同一个consumer group里的一个consumer实例消费。
不一样consumer group能够同时消费同一条消息。

==Consumer Rebalance==
Kafka consumer high level API:
若是某consumer group中consumer数量少于partition数量,则至少有一个consumer会消费多个partition的数据。
若是consumer的数量与partition数量相同,则正好一个consumer消费一个partition的数据。
若是consumer的数量多于partition的数量时,会有部分consumer没法消费该topic下任何一条消息。

==Message Delivery Semantics==
三种:
At most once—Messages may be lost but are never redelivered.
At least once—Messages are never lost but may be redelivered.
Exactly once—this is what people actually want, each message is delivered once and only once.

Producer:有个”acks“配置能够控制接收的leader的在什么状况下就回应producer消息写入成功。

Consumer:
* 读取消息,写log,处理消息。若是处理消息失败,log已经写入,则没法再次处理失败的消息,对应”At most once“。
* 读取消息,处理消息,写log。若是消息处理成功,写log失败,则消息会被处理两次,对应”At least once“。
* 读取消息,同时处理消息并把result和log同时写入。这样保证result和log同时更新或同时失败,对应”Exactly once“。

Kafka默认保证at-least-once delivery,允许用户实现at-most-once语义,exactly-once的实现取决于目的存储系统,kafka提供了读取offset,实现也没有问题。

==Distribution==
Consumer Offset Tracking
1)High-level consumer记录每一个partition所消费的maximum offset,并按期commit到offset manager(broker)。
2)Simple consumer须要手动管理offset。如今的Simple consumer Java API只支持commit offset到zookeeper。

Consumers and Consumer Groups
1)consumer注册到zookeeper
2)属于同一个group的consumer(group id同样)平均分配partition,每一个partition只会被一个consumer消费。
3)当broker或同一个group的其余consumer的状态发生变化的时候,consumer rebalance就会发生。

==Zookeeper协调控制==
1)管理broker与consumer的动态加入与离开。
2)触发负载均衡,当broker或consumer加入或离开时会触发负载均衡算法,使得一个consumer group内的多个consumer的订阅负载平衡。
3)维护消费关系及每一个partition的消费信息。

==日志压缩(Log Compaction)==
1)针对一个topic的partition,压缩使得Kafka至少知道每一个key对应的最后一个值。
2)压缩不会重排序消息。
3)消息的offset是不会变的。
4)消息的offset是顺序的。
5)压缩发送和接收能下降网络负载。
6)以压缩后的形式持久化到磁盘。

2.3   zookeeper 节点
kafka 在 zookeeper 中的存储结构以下图所示:

3、producer 发布消息
3.1   写入方式
producer 采用 push 模式将消息发布到 broker,每条消息都被 append 到 patition 中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 kafka 吞吐率)。

3.2   消息路由
producer 发送消息到 broker 时,会根据分区算法选择将其存储到哪个 partition。其路由机制为:
-  指定了 patition,则直接使用;
-  未指定 patition 但指定 key,经过对 key 的 value 进行hash 选出一个 patition
-  patition 和 key 都未指定,使用轮询选出一个 patition。

3.3   写入流程
producer 写入消息序列图以下所示:

流程说明:
-  producer 先从 zookeeper 的 "/brokers/.../state" 节点找到该 partition 的 leader
-  producer 将消息发送给该 leader
-  leader 将消息写入本地 log
-  followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK
-  leader 收到全部 ISR 中的 replica 的 ACK 后,增长 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK

3.4   producer delivery guarantee
通常状况下存在三种状况:
-  At most once 消息可能会丢,但毫不会重复传输
-  At least one 消息毫不会丢,但可能会重复传输
-  Exactly once 每条消息确定会被传输一次且仅传输一次

当 producer 向 broker 发送消息时,一旦这条消息被 commit,因为 replication 的存在,它就不会丢。可是若是 producer 发送数据给 broker 后,遇到网络问题而形成通讯中断,那 Producer 就没法判断该条消息是否已经 commit。虽然 Kafka 没法肯定网络故障期间发生了什么,可是 producer 能够生成一种相似于主键的东西,发生故障时幂等性的重试屡次,这样就作到了 Exactly once,但目前还并未实现。因此目前默认状况下一条消息从 producer 到 broker 是确保了 At least once,可经过设置 producer 异步发送实现At most once。

4、broker 保存消息
4.1   存储方式
物理上把 topic 分红一个或多个 patition(对应 server.properties 中的 num.partitions=3 配置),每一个 patition 物理上对应一个文件夹(该文件夹存储该 patition 的全部消息和索引文件),以下:

4.2   存储策略
不管消息是否被消费,kafka 都会保留全部消息。有两种策略能够删除旧数据:
-  基于时间:log.retention.hours=168
-  基于大小:log.retention.bytes=1073741824
须要注意的是:由于Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,因此这里删除过时文件与提升 Kafka 性能无关。

4.3 topic 建立与删除
4.3.1 建立 topic
建立 topic 的序列图以下所示:

流程说明:
1. controller 在 ZooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被建立,则 controller 会经过 watch 获得该 topic 的 partition/replica 分配。
2. controller从 /brokers/ids 读取当前全部可用的 broker 列表,对于 set_p 中的每个 partition:
->  从分配给该 partition 的全部 replica(称为AR)中任选一个可用的 broker 做为新的 leader,并将AR设置为新的 ISR
->  将新的 leader 和 ISR 写入 /brokers/topics/[topic]/partitions/[partition]/state
3. controller 经过 RPC 向相关的 broker 发送 LeaderAndISRRequest。

4.3.2 删除 topic
删除 topic 的序列图以下所示:

流程说明:
1. controller 在 zooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被删除,则 controller 会经过 watch 获得该 topic 的 partition/replica 分配。
2. 若 delete.topic.enable=false,结束;不然 controller 注册在 /admin/delete_topics 上的 watch 被 fire,controller 经过回调向对应的 broker 发送 StopReplicaRequest。

5、kafka HA
5.1 replication
如上面拓扑所示,同一个 partition 可能会有多个 replica(对应 server.properties 配置中的 default.replication.factor=N)。没有 replica 的状况下,一旦 broker 宕机,其上全部 patition 的数据都不可被消费,同时 producer 也不能再将数据存于其上的 patition。引入replication 以后,同一个 partition 可能会有多个 replica,而这时须要在这些 replica 之间选出一个 leader,producer 和 consumer 只与这个 leader 交互,其它 replica 做为 follower 从 leader 中复制数据。

Kafka 分配 Replica 的算法以下:
1) 将全部 broker(假设共 n 个 broker)和待分配的 partition 排序
2) 将第 i 个 partition 分配到第(i mod n)个 broker 上
3) 将第 i 个 partition 的第 j 个 replica 分配到第((i + j) mode n)个 broker上

5.2 leader failover
当 partition 对应的 leader 宕机时,须要从 follower 中选举出新 leader。在选举新leader时,一个基本的原则是,新的 leader 必须拥有旧 leader commit 过的全部消息。

kafka 在 zookeeper 中(/brokers/.../state)动态维护了一个 ISR(in-sync replicas),由3.3节的写入流程可知 ISR 里面的全部 replica 都跟上了 leader,只有 ISR 里面的成员才能选为 leader。对于 f+1 个 replica,一个 partition 能够在容忍 f 个 replica 失效的状况下保证消息不丢失。

当全部 replica 都不工做时,有两种可行的方案:
1. 等待 ISR 中的任一个 replica 活过来,并选它做为 leader。可保障数据不丢失,但时间可能相对较长。
2. 选择第一个活过来的 replica(不必定是 ISR 成员)做为 leader。没法保障数据不丢失,但相对不可用时间较短。

kafka 0.8.* 使用第二种方式。
kafka 经过 Controller 来选举 leader,流程请参考5.3节。

5.3 broker failover
kafka broker failover 序列图以下所示:

流程说明:
1)  controller 在 zookeeper 的 /brokers/ids/[brokerId] 节点注册 Watcher,当 broker 宕机时 zookeeper 会 fire watch
2)  controller 从 /brokers/ids 节点读取可用broker
3)  controller决定set_p,该集合包含宕机 broker 上的全部 partition
4)  对 set_p 中的每个 partition
->  从/brokers/topics/[topic]/partitions/[partition]/state 节点读取 ISR
->  决定新 leader(如4.3节所描述)
->  将新 leader、ISR、controller_epoch 和 leader_epoch 等信息写入 state 节点
5)  经过 RPC 向相关 broker 发送 leaderAndISRRequest 命令

5.4 controller failover
当 controller 宕机时会触发 controller failover。每一个 broker 都会在 zookeeper 的 "/controller" 节点注册 watcher,当 controller 宕机时 zookeeper 中的临时节点消失,全部存活的 broker 收到 fire 的通知,每一个 broker 都尝试建立新的 controller path,只有一个竞选成功并当选为 controller。

当新的 controller 当选时,会触发 KafkaController.onControllerFailover 方法,在该方法中完成以下操做:
1)读取并增长 Controller Epoch。
2)在 reassignedPartitions Patch(/admin/reassign_partitions) 上注册 watcher。
3)在 preferredReplicaElection Path(/admin/preferred_replica_election) 上注册 watcher。
4)经过 partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上注册 watcher。
5)若 delete.topic.enable=true(默认值是 false),则 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上注册 watcher。
6)经过 replicaStateMachine在 Broker Ids Patch(/brokers/ids)上注册Watch。
7)初始化 ControllerContext 对象,设置当前全部 topic,“活”着的 broker 列表,全部 partition 的 leader 及 ISR等。
8)启动 replicaStateMachine 和 partitionStateMachine。
9)将 brokerState 状态设置为 RunningAsController。
10)将每一个 partition 的 Leadership 信息发送给全部“活”着的 broker。
11)若 auto.leader.rebalance.enable=true(默认值是true),则启动 partition-rebalance 线程。
12)若 delete.topic.enable=true 且Delete Topic Patch(/admin/delete_topics)中有值,则删除相应的Topic。

6、consumer 消费消息
6.1 consumer API
kafka 提供了两套 consumer API:
1)The high-level Consumer API
2)The SimpleConsumer API
其中 high-level consumer API 提供了一个从 kafka 消费数据的高层抽象,而 SimpleConsumer API 则须要开发人员更多地关注细节。

6.1.1 The high-level consumer API
high-level consumer API 提供了 consumer group 的语义,一个消息只能被 group 内的一个 consumer 所消费,且 consumer 消费消息时不关注 offset,最后一个 offset 由 zookeeper 保存。

使用 high-level consumer API 能够是多线程的应用,应当注意:
1)若是消费线程大于 patition 数量,则有些线程将收不到消息
2)若是 patition 数量大于线程数,则有些线程多收到多个 patition 的消息
3)若是一个线程消费多个 patition,则没法保证你收到的消息的顺序,而一个 patition 内的消息是有序的

6.1.2 The SimpleConsumer API
若是你想要对 patition 有更多的控制权,那就应该使用 SimpleConsumer API,好比:
1)屡次读取一个消息
2)只消费一个 patition 中的部分消息
3)使用事务来保证一个消息仅被消费一次

可是使用此 API 时,partition、offset、broker、leader 等对你再也不透明,须要本身去管理。你须要作大量的额外工做:
1)必须在应用程序中跟踪 offset,从而肯定下一条应该消费哪条消息
2)应用程序须要经过程序获知每一个 Partition 的 leader 是谁
3)须要处理 leader 的变动

使用 SimpleConsumer API 的通常流程以下:
1)查找到一个“活着”的 broker,而且找出每一个 partition 的 leader
2)找出每一个 partition 的 follower
3)定义好请求,该请求应该能描述应用程序须要哪些数据
4)fetch 数据
5)识别 leader 的变化,并对之做出必要的响应

如下针对 high-level Consumer API 进行说明。

6.2 consumer group
如 2.2 节所说, kafka 的分配单位是 patition。每一个 consumer 都属于一个 group,一个 partition 只能被同一个 group 内的一个 consumer 所消费(也就保障了一个消息只能被 group 内的一个 consuemr 所消费),可是多个 group 能够同时消费这个 partition。

kafka 的设计目标之一就是同时实现离线处理和实时处理,根据这一特性,可使用 spark/Storm 这些实时处理系统对消息在线处理,同时使用 Hadoop 批处理系统进行离线处理,还能够将数据备份到另外一个数据中心,只须要保证这三者属于不一样的 consumer group。以下图所示:

6.3 消费方式
consumer 采用 pull 模式从 broker 中读取数据。

push 模式很难适应消费速率不一样的消费者,由于消息发送速率是由 broker 决定的。它的目标是尽量以最快速度传递消息,可是这样很容易形成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则能够根据 consumer 的消费能力以适当的速率消费消息。

对于 Kafka 而言,pull 模式更合适,它可简化 broker 的设计,consumer 可自主控制消费消息的速率,同时 consumer 能够本身控制消费方式——便可批量消费也可逐条消费,同时还能选择不一样的提交方式从而实现不一样的传输语义。

6.4 consumer delivery guarantee
若是将 consumer 设置为 autocommit,consumer 一旦读到数据当即自动 commit。若是只讨论这一读取消息的过程,那 Kafka 确保了 Exactly once。

但实际使用中应用程序并不是在 consumer 读取完数据就结束了,而是要进行进一步处理,而数据处理与 commit 的顺序在很大程度上决定了consumer delivery guarantee:
1)读完消息先 commit 再处理消息。
这种模式下,若是 consumer 在 commit 后还没来得及处理消息就 crash 了,下次从新开始工做后就没法读到刚刚已提交而未处理的消息,这就对应于 At most once
2)读完消息先处理再 commit。
这种模式下,若是在处理完消息以后 commit 以前 consumer crash 了,下次从新开始工做时还会处理刚刚未 commit 的消息,实际上该消息已经被处理过了。这就对应于 At least once。
3)若是必定要作到 Exactly once,就须要协调 offset 和实际操做的输出。
精典的作法是引入两阶段提交。若是能让 offset 和操做输入存在同一个地方,会更简洁和通用。这种方式可能更好,由于许多输出系统可能不支持两阶段提交。好比,consumer 拿到数据后可能把数据放到 HDFS,若是把最新的 offset 和数据自己一块儿写到 HDFS,那就能够保证数据的输出和 offset 的更新要么都完成,要么都不完成,间接实现 Exactly once。(目前就 high-level API而言,offset 是存于Zookeeper 中的,没法存于HDFS,而SimpleConsuemr API的 offset 是由本身去维护的,能够将之存于 HDFS 中)

总之,Kafka 默认保证 At least once,而且容许经过设置 producer 异步提交来实现 At most once。而 Exactly once 要求与外部存储系统协做,幸运的是 kafka 提供的 offset 能够很是直接很是容易得使用这种方式。

6.5 consumer rebalance
当有 consumer 加入或退出、以及 partition 的改变(如 broker 加入或退出)时会触发 rebalance。consumer rebalance算法以下:
1) 将目标 topic 下的全部 partirtion 排序,存于PT
2) 对某 consumer group 下全部 consumer 排序,存于 CG,第 i 个consumer 记为 Ci
3) N=size(PT)/size(CG),向上取整
4) 解除 Ci 对原来分配的 partition 的消费权(i从0开始)
5) 将第i*N到(i+1)*N-1个 partition 分配给 Ci

在 0.8.*版本,每一个 consumer 都只负责调整本身所消费的 partition,为了保证整个consumer group 的一致性,当一个 consumer 触发了 rebalance 时,该 consumer group 内的其它全部其它 consumer 也应该同时触发 rebalance。这会致使如下几个问题:
1)Herd effect
任何 broker 或者 consumer 的增减都会触发全部的 consumer 的 rebalance
2)Split Brain
每一个 consumer 分别单独经过 zookeeper 判断哪些 broker 和 consumer 宕机了,那么不一样 consumer 在同一时刻从 zookeeper 看到的 view 就可能不同,这是由 zookeeper 的特性决定的,这就会形成不正确的 reblance 尝试。
3)调整结果不可控
全部的 consumer 都并不知道其它 consumer 的 rebalance 是否成功,这可能会致使 kafka 工做在一个不正确的状态。

基于以上问题,kafka考虑在0.9.*版本开始使用中心 coordinator 来控制 consumer rebalance,而后又从简便性和验证要求两方面考虑,计划在 consumer 客户端实现分配方案。

7、注意事项
7.1 producer 没法发送消息的问题
最开始在本机搭建了kafka伪集群,本地 producer 客户端成功发布消息至 broker。随后在服务器上搭建了 kafka 集群,在本机链接该集群,producer 却没法发布消息到 broker(奇怪也没有抛错)。最开始怀疑是 iptables 没开放,因而开放端口,结果还不行(又开始是代码问题、版本问题等等,倒腾了好久)。最后没办法,一项一项查看 server.properties 配置,发现如下两个配置:

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = security_protocol://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092
 # Hostname and port the broker will advertise to producers and consumers. If not set, 
 # it uses the value for "listeners" if configured. Otherwise, it will use the value
 # returned from java.net.InetAddress.getCanonicalHostName().
 #advertised.listeners=PLAINTEXT://your.host.name:9092

以上说的就是 advertised.listeners 是 broker 给 producer 和 consumer 链接使用的,若是没有设置,就使用 listeners,而若是 host_name 没有设置的话,就使用 java.net.InetAddress.getCanonicalHostName() 方法返回的主机名。

相关文章
相关标签/搜索