Kfaka学习笔记整理
1. Produce:生产者,将消息发送到Kafka中
2. Broker:集群中包含的服务器,kafka存储数据的角色
3. Consumer:消费者,读kafka中的消息
4. Topic:特指Kafka处理的消息源的不同分类
5. Partition:Topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。
6. Message:消息
7. Replica:分区的副本
8. Leader:replica中的一个角色,producer和 consumer 只跟 leader 交互。
9. Follower:replica中一个角色,从leader中复制数据。
10. Controller:kafka集群中的一个服务器,用来进行分区leader election 以及各种 failover。也负责增删Topic以及Replica的重新分配。
11. zookeeper:kafka 通过 zookeeper 来存储集群的 meta 信息。
实现:
1. 数据磁盘持久化:消息不放在内存中,直接放入磁盘,利用磁盘的顺序读写性能。
2. zero-copy:减少IO操作步骤(使用了sendfile这个高级系统函数)
3. 数据批量发送
4. 数据压缩
5. Topic划分为多个partition,提高并行度
实现:
1. producer根据用户指定的算法,将消息发送到指定的partition
2. 存在多个partiiton,每个partition有自己的replica,每个replica分布在不同的Broker节点上
好处:consumer可以自主控制消费情况。
增加broker节点
Producer采用push模式将消息发布到broker,每条消息通过分区规则追加到对应分区文件
1. 指定分区id,直接使用,
2. 自定义分区算法,通过自定义分区算法进行分区确定
定义分区类实现kafka.producer.Partitioner接口
设置“partitioner.class"”属性为自定义分区类
3. 未指定分区id和自定义分区但指定 key,通过对 key 的 value 进行hash 选出一个 patition
4. 分区id 和 key 和自定义分区都未指定,使用轮询选出一个 patition
1. Producer从zookeeper的/brokers/…/state找到该分区的leader
2. Producer将消息发送给leader
3. Leader将消息写入本地log
4. Follower同步leader的消息,同步完成之后发送ack
5. Leader收到所有ISR中的replica的ack后,向produer发送ack
ACK:确认机制
ISR:Kafka在Zookeeper中动态维护了一个ISR(副本同步列表)
1. 物理上把 topic分成一个或多个patition,每个patition物理上对应一个目录,目录下包含:多个*.index(索引文件)和*.log(具体数据),一对index和log称为segment文件
2. 文件命名规则:partition全局的第一个segment从0开始,后续的每一个segment文件名是上一个segment文件中最后一条消息的offset值。
3. segment内容:
比如:要查找绝对offset为7的Message:
首先是用二分查找确定它是在哪个LogSegment中,自然是在第一个Segment中。
打开这个Segment的index文件,也是用二分查找找到offset小于或者等于指定offset的索引条目中最大的那个offset。自然offset为6的那个索引是我们要找的,通过索引文件我们知道offset为6的Message在数据文件中的位置为9807。
打开数据文件,从位置为9807的那个地方开始顺序扫描直到找到offset为7的那条Message。
无论消息是否被消费,kafka 都会保留所有消息。有两种策略可以删除旧数据:
1. 基于时间:log.retention.hours
2. 基于大小:log.retention.byte
1. controller 在 ZooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被创建,则 controller 会通过 watch 得到该 topic 的 partition/replica 分配。
2. controller从 /brokers/ids 读取当前所有可用的 broker 列表,对于 set_p 中的每一个 partition:
2.1 从分配给该 partition 的所有 replica(称为AR)中任选一个可用的 broker 作为新的 leader,并将AR设置为新的 ISR
2.2 将新的 leader 和 ISR 写入 /brokers/topics/[topic]/partitions/[partition]/state
3. controller 通过 RPC 向相关的 broker 发送 LeaderAndISRRequest。
1. controller 在 zooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被删除,则 controller 会通过 watch 得到该 topic 的 partition/replica 分配。
2. 若 delete.topic.enable=false,结束;否则 controller 注册在 /admin/delete_topics 上的 watch 被 fire,controller 通过回调向对应的 broker 发送 StopReplicaRequest。
broker 宕机,会在其他broker副本之间选出一个 leader,producer 和 consumer 只与这个 leader 交互,其它 replica 作为 follower 从 leader 中复制数据。
Kafka 分配 Replica的算法如下:
1. 将所有 broker(假设共 n 个 broker)和待分配的 partition 排序
2. 将第 i 个 partition 分配到第(i mod n)个 broker 上
3. 将第 i 个 partition 的第 j 个 replica 分配到第((i + j) mode n)个 broker上
kafka 在 zookeeper (/brokers/.../state)中维护了一个ISR,leader宕机是,从ISR中选出新的leader(kafka 通过 Controller来选举 leader)
1. controller在zookeeper的/brokers/ids/[brokerid]节点注册watcher,当broker宕机是,zookeeper会fire watch
2. controller从/brokers/ids中获取可用broker
3. controller计算宕机broker上的partition的集合set_p
4. 对set_p的每个partition
从/brokers/topics/[topic]/partitions/[partition]/state节点读取ISR
决定新的leader
将新的leader、ISR写入state节点
5. 通过RPC向相关broker发送新的leader、ISR等更新请求
每个broker都会在zookeeper的/controller节点注册watcher,当controller宕机时,zookeeper中的/controller节点会消失,所有存活的broker收到fire通知,每个broker都尝试创建新的controller patch,Zookeeper一个节点只能被一个客户端创建成功,只有一个竞选成功,即先到先得原则。
At most once 消息可能会丢,但绝不会重复传输
At least one 消息绝不会丢,但可能会重复传输
Exactly once 每条消息肯定会被传输一次且仅传输一次,很多时候这是用户所想要的
producer 的deliver guarantee 可以通过request.required.acks参数的设置来进行调整:
0 ,相当于异步发送,消息发送完毕即offset增加,继续生产;相当于Atmost once
1,leader收到leaderreplica 对一个消息的接受ack才增加offset,然后继续生产;
-1,leader收到所有replica 对一个消息的接受ack才增加offset,然后继续生产
读完消息先commit再处理消息。这种模式下,如果consumer在commit后还没来得及处理消息就crash了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于At most once
读完消息先处理再commit。这种模式下,如果处理完了消息在commit之前consumer crash了,下次重新开始工作时还会处理刚刚未commit的消息,实际上该消息已经被处理过了。这就对应于At least once(默认)
1. 节点必须可以维护和ZooKeeper的连接,Zookeeper通过心跳机制检查每个节点的连接
2. 如果节点是个follower,他必须能及时的同步leader的写操作,延时不能太久
1. 等待 ISR 中的任一个 replica活过来,并选它作为 leader。可保障数据不丢失,但时间可能相对较长。
2. 选择第一个活过来的 replica(不一定是 ISR 成员)作为 leader。无法保障数据不丢失,但相对不可用时间较短。
1. 一条消息太大
#单条消息最大大小控制,消费端的最大拉取大小需要略大于该值
message.max.bytes 1000000
replica.fetch.max.bytes (默认: 1MB) – broker可复制的消息的最大字节数。这个值应该比message.max.bytes大,否则broker会接收此消息,但无法将此消息复制出去,从而造成数据丢失。
fetch.message.max.bytes (默认 1MB) – 消费者能读取的最大消息。这个值应该大于或等于message.max.bytes。
2. 缓存队列配置问题
#异步发送模式下,缓存数据的最长时间,之后便会被发送到broker
queue.buffering.max.ms 5000
#producer端异步模式下最多缓存的消息条数
queue.buffering.max.messages 10000
#0代表队列没满的时候直接入队,满了立即扔弃,-1代表无条件阻塞且不丢弃
queue.enqueue.timeout.ms -1
3. 安装kafka
原因:因为之前安装过kafak,log.dirs属性配置的目录下有'.gnome2',启动kafka时,会加载这个目录,
处理:删除log.dirs属性配置的目录(因为还会处理目录下其他的东西,所以就把整个目录删掉了),启动kafak,成功。
http://www.javashuo.com/article/p-zyhzdmxo-cy.html
https://www.cnblogs.com/jun1019/p/6256514.html
http://blog.csdn.net/caisini_vc/article/details/48007297
http://www.cnblogs.com/likehua/p/3999538.html
https://www.cnblogs.com/byrhuangqiang/p/6369176.html
https://www.cnblogs.com/cynchanpin/p/7339537.html