topic :主题名,用于对消息进行分类,是一个逻辑上的概念html
partition :是物理上的一个概念,一个topic能够对应多个partition,消息实际存储在partition正则表达式
partition 是一个有序,不可变的记录序列。partition中的每一条消息都有一个序列号,称之为offset,offset 在一个partition内惟一,用于区别消息shell
segment :partition被分为多个segment文件进行存储apache
partition 的物理存储结构编程
建立topic:test.show.log, 副本数2,分区数3,segment文件大小512bytebootstrap
# 建立topic bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 2 --partitions 3 --topic test.show.log --config segment.bytes=512 # 查看topic 状态 bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test.show.log Topic:test.show.log PartitionCount:3 ReplicationFactor:2 Configs:segment.bytes=512 Topic: test.show.log Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0 Topic: test.show.log Partition: 1 Leader: 0 Replicas: 0,2 Isr: 0,2 Topic: test.show.log Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1
能够看到broker.0 服务持有partition 0,1的副本,而且为1 partition的leader数组
打开其日志保存目录能够看到缓存
test.show.log-0 test.show.log-1
broker 对于topic 的每个 partition 使用单独的目录保存,每一个目录下初始有服务器
00000000000000000000.index 00000000000000000000.log 00000000000000000000.timeindex leader-epoch-checkpoint
.log文件: segment 日志文件网络
.index文件: segment offset索引文件
.timeindex文件:segment timestamp索引文件
leader-epoch-checkpoint:用于副本备份机制
每个分区的日志文件被分为多个segment文件,segment文件的命名规则:
segment 文件记录的信息
.index 文件:偏移量->消息在.log文件中的物理位置.timeindex 文件:时间戳T->.index文件中的偏移量offset,表示表示比T晚的全部消息偏移量都比offset大
根据offset查找文件的步骤
.index 使用 稀疏索引,kafka会在内存中维护一份索引,经过二分查找定位到消息所在的.log文件,以及在.log文件中的位置。
分区的意义
分段的意义
timeindex 的意义
消息格式通过了几个版本的变动
具体见:
https://www.cnblogs.com/qwang...
https://kafka.apache.org/docu...
挑几个字段来讲明
key: 每一个消息均可以指定一个key。能够经过给多个消息指定同一个key分发到同一个partition中,从而保证消息的有序性。headers:可变长消息头,能够不须要解析payload而拿到一些消息的属性信息。
timestamp: 时间戳。时间戳有两种类型:CreateTime,LogAppendTime。
能够经过 broker 配置 log.message.timestamp.type来指定全局topic时间戳类型;
也能够经过命令行建立topic时单独指定该topic的时间戳类型。
生产者发送消息时,能够指定消息的事件戳,若是未指定,则使用生产者客户端当前时间。
kafka client 对 partition的读写都是直接访问leader。那么客户端是如何找到leader的?
主要是经过发送一个称之为TopicMetadataRequest的请求来获取。
TopicMetadataRequest => [TopicName] TopicName => string
Field Type Description TopicName []string 想要获取的topic的元数组,若是为空,则下发全部topic的元数据 MetadataResponse => [Broker][TopicMetadata] Broker => NodeId Host Port (any number of brokers may be returned) NodeId => int32 Host => string Port => int32 TopicMetadata => TopicErrorCode TopicName [PartitionMetadata] TopicErrorCode => int16 PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr PartitionErrorCode => int16 PartitionId => int32 Leader => int32 Replicas => [int32] Isr => [int32]
Field Description Broker kafka broker的信息,包括broker id, hostname, port Isr 与leader 保持同步的broker id Leader leader的broker id,若是当前不存在leader(leader正在选举中),则为-1 Replicas 全部当前存活的follower
服务器MetaData的存储后续研究
kafka 协议文档地址:https://cwiki.apache.org/conf...
push :broker 控制传输速度,若是消费者处理速度较慢,会积压大量消息,最终致使消费者拒绝提供服务
pull :适用于向消费者批量发送大量数据
pull 方式的不足:
消费者可能须要轮训等待消息的到达,为避免这种状况,拉取请求中能够指示等待给定数量的数据到达。
auto.offset.reset
consumer group && offset commit
consumer 做为组进行消费时,须要记录每一个分区消费的offset,以便于进行重平衡后,新的消费者能够从上一个位置继续消费。
这个offset是由consumer主动提交的,broker会记录在称之为__consumer_offsets的topic中,其对应的partition为 hash(group.id)%mode
提交方式
- 自动提交
enable.auto.commit=true
- 手动提交
一个partition,同一时间只会被同一group内的consumer消费,若是consumer数量大于partition数量,则多余的consumer一直空闲
rebalance
consumer 的离开加入,partition数量的变化,以及订阅topic数发生变动(能够经过正则表达式订阅多个topic)都会致使rebalance,rebalance 是由一个称之为coordinator的broker来负责的。
coordinator 的选取:
- 看offset保存在哪一个partition中
- 该partition的leader做为该group的coordinator
rebalance 大体流程:
- coordinator 随机选取一个consumer做为leader,并将角色信息发给consumers,还会把follower的信息发送给leader。
- consumer leader 根据获得的信息分配partition
- consumers 发送同步请求至coordinator,consumer leader 发送的请求中包含分配状况
- coordinator 将分配状况告诉全部consumer
replica
kafka 默认使用副本机制,将不须要备份的topic看作副本数为1。kafka 备份的单元是topic partition。
follower 做为 consumer 从 leader 拉取数据,而后应用到本身的log中。拉取方式可使follower 批量写入本身的log。
Isr (in sync)状态
需同时知足如下两个条件
- 节点的session必须在zookeeper中(必须和zk保持链接,经过zookeeper的心跳机制)
- 若是是一个follwer,必须同步leader的写入,而且不能太落后
leader 跟踪 Isr 节点集,若是follower 死亡,卡住,或落后,leader 就会将他从 Isr 列表中删除。卡住或滞后副本的肯定由replica.lag.time.max.ms配置项控制。
replica.lag.time.max.ms
If a follower hasn't sent any fetch requests or hasn't consumed up to the leaders log end offset for at least this time, the leader will remove the follower from isrlong
default 10000
定义:Isr集合内的全部节点将其写入本身的log中
只有确认commited的消息才会被consumer 消费,消费者没必要担忧会获得一条会丢失的消息,也就是说只要消费者获得了一条消息,那么即便leader挂掉,该消息也不会丢失(由于已同步至Isr集合)。
对于producer来讲,能够在发送请求的即时性和消息的持久化之间进行权衡,来选择是否等待消息commited。
这个选项能够经过acks配置。
acks=-1|all 与min.insync.replicas 配合能够最大程度保证消息的可靠性。
acks=-1|all 表示消息被追加到Isr集合内全部节点后才返回,若是当前Isr只有一个leader(follower因为某些缘由掉线),那么也会返回成功,随后leader挂掉,其余follower被选举为leader,那么该条消息就会丢失。min.insync.replicas 能够指定,若是当前Isr内节点数量小于min.insync.replicas指定数量,则producer直接抛出异常。
leader election
kafka controller.brokers中的一个节点会担当controller的角色(听说是经过zk建立节点,建立成功则为controller,失败则监听该节点,若是controller挂掉,则再次竞争)
controller 负责管理整个集群中分区和副本的状态。
leader选举
若是Isr中有至少一个replica幸存,则选择其中一个为leader。不然选择该partition中的任意一个幸存的replica为leader。
replica 都不工做
- 等待ISR中的任一个replica 活过来,而且选他做为leader。
- 选择第一个活过来的replica做为leader。
第一种等待的时间可能会比较长,或者不可用。第二种不保证包含了全部已commited的消息。
须要在可用性和一致性当中作出折中。
unclean.leader.election.enable 配置指定使用哪一种方案,默认是true,使用第2种。
数据的一致性
待研究
日志删除策略
kafka对于过时日志的删除有两种策略
delete
直接删除过时的segment文件
compact
经过建立新的segment文件将相同key的最新一条消息保留下来(缩容,合并)
compact 效果图:
应用场景
这个特性能够保证日志包含每一个key的最终值的完整快照,消费者就能够从这个topic中恢复本身的状态,而不须要保留全部更改的完整日志。
待研究
首先确保 broker 配置项 log.cleaner.enable 为 true.
能够经过设置broker 配置项 log.cleanup.policy 为 compact,默认是 delete.
能够经过设置topic 配置项c leanup.policy 为 compact,默认是delete.
topic 配置项能够覆盖broker全局配置.
log.cleanup.policy 和 cleanup.policy 取值能够是compact或delete或compact,delete
线程模型
一个线程做为acceptor,接受tcp链接,n个处理线程,每一个处理线程处理固定数量的链接。
协议
定长编码int8, int16, int32, int64。大端序
变长编码
length+content
字符串 length 用int16表示,二进制数组length用int32表示。
content为空用length=-1表示
数组
sizeof(array)+array[0]+array[1]....+array[n]
数组大小使用int32表示
增长broker,增长partition,增长replication
待研究
producer请求的幂等性
幂等:
在编程中一个幂等操做的特色是其任意屡次执行所产生的影响均与一次执行的影响相同
producer 引入幂等性的意义:
防止生产者重复生产消息。生产者进行retry时重复产生消息,有了幂等性以后,在进行retry重试时,只会有一条消息commited。
实现方式:
- PID。每一个producer在初始化时会被分配一个惟一的PID,改PID对用户不可见
- Sequence Number。producer 在向每一个paritition发送的每条数据都会携带一个seq。其从0递增
- broker 会缓存PID及其seq。若是收到的消息seq比缓存seq大1(highwater mark)则接受,不然丢弃。(tcp ack)
非幂等
幂等
PID 的生成:
- producer 从任意一个broker获取事务协调者(Transaction Coordinator)的信息
- producer 向 Transaction Coordinator 请求PID
Transaction Coordinator 的选择
PID的生成规则及保存失效规则
注意事项:
若是使用kafka的幂等性,则必须开启 topic 配置 enable.idempotenceWhen set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer retries due to broker failures, etc., may write duplicates of the retried message in the stream. Note that enabling idempotence requires
max.in.flight.requests.per.connection
to be less than or equal to 5,retries
to be greater than 0 andacks
must be 'all'. If these values are not explicitly set by the user, suitable values will be chosen. If incompatible values are set, aConfigException
will be thrown.前提条件 acks必须是all
待研究
待研究