Kafka整理

Kfaka学习笔记整理

1      架构

1.1    结构图


1.2    名词解释

1.       Produce:生产者,将消息发送到Kafka中

2.       Broker:集群中包含的服务器,kafka存储数据的角色

3.       Consumer:消费者,读kafka中的消息

4.       Topic:特指Kafka处理的消息源的不同分类

5.       Partition:Topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。

6.       Message:消息

7.       Replica:分区的副本

8.       Leader:replica中的一个角色,producer和 consumer 只跟 leader 交互。

9.       Follower:replica中一个角色,从leader中复制数据。

10.   Controller:kafka集群中的一个服务器,用来进行分区leader election 以及各种 failover。也负责增删Topic以及Replica的重新分配。

11.   zookeeper:kafka 通过 zookeeper 来存储集群的 meta 信息。

2      特性

2.1    高吞吐量

实现:

1.       数据磁盘持久化:消息不放在内存中,直接放入磁盘,利用磁盘的顺序读写性能。

2.       zero-copy:减少IO操作步骤(使用了sendfile这个高级系统函数

3.       数据批量发送

4.       数据压缩

5.       Topic划分为多个partition,提高并行度

2.2    负载均衡

实现:

1.       producer根据用户指定的算法,将消息发送到指定的partition

2.       存在多个partiiton,每个partition有自己的replica,每个replica分布在不同的Broker节点上

2.3    拉取系统

好处:consumer可以自主控制消费情况。

2.4    可扩展性

增加broker节点 

3      producer 流程

3.1    写入方式

Producer采用push模式将消息发布到broker,每条消息通过分区规则追加到对应分区文件

3.2    分区机制

1.       指定分区id,直接使用,

2.       自定义分区算法,通过自定义分区算法进行分区确定

              定义分区类实现kafka.producer.Partitioner接口

              设置“partitioner.class"”属性为自定义分区类

3.       未指定分区id和自定义分区但指定 key,通过对 key 的 value 进行hash 选出一个 patition

4.       分区id 和 key 和自定义分区都未指定,使用轮询选出一个 patition

3.3    写入流程

1.       Producer从zookeeper的/brokers/…/state找到该分区的leader

2.       Producer将消息发送给leader

3.       Leader将消息写入本地log

4.       Follower同步leader的消息,同步完成之后发送ack

5.       Leader收到所有ISR中的replica的ack后,向produer发送ack

 

ACK:确认机制

ISR:Kafka在Zookeeper中动态维护了一个ISR(副本同步列表)

4      broker 机制

4.1    存储方式

1.    物理上把 topic分成一个或多个patition,每个patition物理上对应一个目录,目录下包含:多个*.index(索引文件)和*.log(具体数据),一对indexlog称为segment文件

2.       文件命名规则:partition全局的第一个segment0开始,后续的每一个segment文件名是上一个segment文件中最后一条消息的offset值。

3.       segment内容:


比如:要查找绝对offset为7的Message:

首先是用二分查找确定它是在哪个LogSegment中,自然是在第一个Segment中。

打开这个Segment的index文件,也是用二分查找找到offset小于或者等于指定offset的索引条目中最大的那个offset。自然offset为6的那个索引是我们要找的,通过索引文件我们知道offset为6的Message在数据文件中的位置为9807。

打开数据文件,从位置为9807的那个地方开始顺序扫描直到找到offset为7的那条Message。

4.2    存储策略

无论消息是否被消费,kafka 都会保留所有消息。有两种策略可以删除旧数据:

1. 基于时间:log.retention.hours

2. 基于大小:log.retention.byte

4.3    topic 创建与删除

1. controller 在 ZooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被创建,则 controller 会通过 watch 得到该 topic 的 partition/replica 分配。
2. controller从 /brokers/ids 读取当前所有可用的 broker 列表,对于 set_p 中的每一个 partition:
               2.1 从分配给该 partition 的所有 replica(称为AR)中任选一个可用的 broker 作为新的 leader,并将AR设置为新的 ISR
               2.2 将新的 leader 和 ISR 写入 /brokers/topics/[topic]/partitions/[partition]/state
3. controller 通过 RPC 向相关的 broker 发送 LeaderAndISRRequest。

4.4    删除 topic

1. controller 在 zooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被删除,则 controller 会通过 watch 得到该 topic 的 partition/replica 分配。
2. 若 delete.topic.enable=false,结束;否则 controller 注册在 /admin/delete_topics 上的 watch 被 fire,controller 通过回调向对应的 broker 发送 StopReplicaRequest。

5      kafka HA

5.1    replication

broker 宕机,会在其他broker副本之间选出一个 leader,producer 和 consumer 只与这个 leader 交互,其它 replica 作为 follower 从 leader 中复制数据。

Kafka 分配 Replica的算法如下:

1. 将所有 broker(假设共 n 个 broker)和待分配的 partition 排序
2. 将第 i 个 partition 分配到第(i mod n)个 broker 上
3. 将第 i 个 partition 的第 j 个 replica 分配到第((i + j) mode n)个 broker上

5.2    leader failover

kafka 在 zookeeper (/brokers/.../state)中维护了一个ISR,leader宕机是,从ISR中选出新的leader(kafka 通过 Controller来选举 leader)

5.3    broker failover

1.       controller在zookeeper的/brokers/ids/[brokerid]节点注册watcher,当broker宕机是,zookeeper会fire watch

2.       controller从/brokers/ids中获取可用broker

3.       controller计算宕机broker上的partition的集合set_p

4.       对set_p的每个partition

从/brokers/topics/[topic]/partitions/[partition]/state节点读取ISR

决定新的leader

将新的leader、ISR写入state节点

5.       通过RPC向相关broker发送新的leader、ISR等更新请求

5.4    controller failover

每个broker都会在zookeeper的/controller节点注册watcher,当controller宕机时,zookeeper中的/controller节点会消失,所有存活的broker收到fire通知,每个broker都尝试创建新的controller patch,Zookeeper一个节点只能被一个客户端创建成功,只有一个竞选成功,即先到先得原则。

 

6      消息投递机制

At most once 消息可能会丢,但绝不会重复传输

At least one 消息绝不会丢,但可能会重复传输

Exactly once 每条消息肯定会被传输一次且仅传输一次,很多时候这是用户所想要的

6.1    Producer

producer 的deliver guarantee 可以通过request.required.acks参数的设置来进行调整:

0 ,相当于异步发送,消息发送完毕即offset增加,继续生产;相当于Atmost once

1,leader收到leaderreplica 对一个消息的接受ack才增加offset,然后继续生产;

-1,leader收到所有replica 对一个消息的接受ack才增加offset,然后继续生产

6.2    Consumer

读完消息先commit再处理消息。这种模式下,如果consumer在commit后还没来得及处理消息就crash了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于At most once

读完消息先处理再commit。这种模式下,如果处理完了消息在commit之前consumer crash了,下次重新开始工作时还会处理刚刚未commit的消息,实际上该消息已经被处理过了。这就对应于At least once(默认)

7      Others

7.1    in sync 条件

1.       节点必须可以维护和ZooKeeper的连接,Zookeeper通过心跳机制检查每个节点的连接

2.       如果节点是个follower,他必须能及时的同步leader的写操作,延时不能太久

7.2    副本都不工作方案

1. 等待 ISR 中的任一个 replica活过来,并选它作为 leader。可保障数据不丢失,但时间可能相对较长。

2. 选择第一个活过来的 replica(不一定是 ISR 成员)作为 leader。无法保障数据不丢失,但相对不可用时间较短。

 

8      遇到的问题

1.       一条消息太大

#单条消息最大大小控制,消费端的最大拉取大小需要略大于该值

message.max.bytes 1000000

replica.fetch.max.bytes (默认: 1MB) – broker可复制的消息的最大字节数。这个值应该比message.max.bytes大,否则broker会接收此消息,但无法将此消息复制出去,从而造成数据丢失。

fetch.message.max.bytes (默认 1MB) – 消费者能读取的最大消息。这个值应该大于或等于message.max.bytes。

2.       缓存队列配置问题

#异步发送模式下,缓存数据的最长时间,之后便会被发送到broker

queue.buffering.max.ms 5000

#producer端异步模式下最多缓存的消息条数

queue.buffering.max.messages 10000

#0代表队列没满的时候直接入队,满了立即扔弃,-1代表无条件阻塞且不丢弃

queue.enqueue.timeout.ms -1

3.       安装kafka

原因:因为之前安装过kafak,log.dirs属性配置的目录下有'.gnome2',启动kafka时,会加载这个目录,

处理:删除log.dirs属性配置的目录(因为还会处理目录下其他的东西,所以就把整个目录删掉了),启动kafak,成功。

9    参考文章