1.producer:
消息生产者,发布消息到 kafka 集群的终端或服务。
2.broker:
kafka 集群中包含的服务器。
3.topic:
每条发布到 kafka 集群的消息属于的类别,即 kafka 是面向 topic 的。
4.partition:
partition 是物理上的概念,每一个 topic 包含一个或多个 partition。kafka 分配的单位是 partition。
5.consumer:
从 kafka 集群中消费消息的终端或服务。
6.Consumer group:
high-level consumer API 中,每一个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但能够被多个 consumer group 消费。
7.replica:
partition 的副本,保障 partition 的高可用。
8.leader:
replica 中的一个角色, producer 和 consumer 只跟 leader 交互。
9.follower:
replica 中的一个角色,从 leader 中复制数据。
10.controller:
kafka 集群中的其中一个服务器,用来进行 leader selection。
12.zookeeper:
kafka 经过 zookeeper 来存储集群的信息。java
为了解决kafka查找效率问题,kafka的message在存储的时候采起了分段+索引的存储方式。
Kafka中的Message是以topic为基本单位组织的,不一样的topic之间是相互独立的。每一个topic又能够分红几个不一样的partition(每一个topic有几个partition是在建立topic时指定的),每一个partition存储一部分Message。
partition是以文件的形式存储在文件系统中,好比,建立了一个名为iss_fw_state_callback的topic,其有2个partition,那么在Kafka的数据目录中(由配置文件中的log.dirs指定的)中就有这样2个目录: iss_fw_state_callback-0,iss_email_send_topic-1,其命名规则为<topic_name>-<partition_id>,里面存储的分别就是这2个partition的数据
partition是分段的,每一个段叫LogSegment,包括了一个数据文件和一个索引文件,下图是某个partition目录下的文件
能够看到,这个partition有4个LogSegment。partion全局的第一个segment从0开始,后续每一个segment文件名为上一个 segment文件最后一条消息的offset值进行递增算法
在partition中如何经过offset查找message?查找的算法是:
1.索引文件:根据offset的值,查找segment段中的index索引文件。
注:因为索引文件命名是以上一个文件的最后一个offset进行命名的,因此,使用二分查找算法可以根据offset快速定位到指定的索引文件。
2.近似position:找到索引文件后,根据offset进行定位,找到索引文件中的符合范围的索引。
注:因为是稀疏索引,因此可能只能定位到一条相近的消息
3.具体消息:获得position之后,再到对应的log文件中,从position出开始查找offset对应的消息,将每条消息的offset与目标offset进行比较,直到找到消息。缓存
示例:咱们要查找offset=368776这条消息(参照上面的图)
那么先找到00000000000000368769.index,
在索引文件中,找到索引6(对应的offset=368775)的具体位置position=1407(注:此次只能找到近似offset)
到log文件中,根据1407这个position开始查找,比较每条消息的offset是否大于等于368776。最后查找到对应的消息之后返回。服务器
查看kafka生产者源码:markdown
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) { this(topic, partition, timestamp, key, value, (Iterable)null); } public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) { this(topic, partition, (Long)null, key, value, headers); } public ProducerRecord(String topic, Integer partition, K key, V value) { this(topic, partition, (Long)null, key, value, (Iterable)null); } public ProducerRecord(String topic, K key, V value) { this(topic, (Integer)null, (Long)null, key, value, (Iterable)null); } public ProducerRecord(String topic, V value) { this(topic, (Integer)null, (Long)null, (Object)null, value, (Iterable)null); }
partition分为三种状况:网络
1.副本数据同步策略: |
方案 | 优势 | 缺点 |
---|---|---|---|
半数以上完成同步,就发送ack | 延迟低 | 选举新的leader时,容忍n台节点的故障,须要2n+1个副本 | |
所有完成同步,才发送ack | 选举新的leader时,容忍n台节点的故障,须要n+1个副本 | 延迟高 |
Kafka采用的是第二种方案,由于网络延迟对kafka的延迟小。第一种方案会形成大量数据冗余,浪费资源。架构
ISR(同步副本In-sync replicas):
试想,若是有一个follower故障,长时间没有从leader同步数据,那么leader就会一直等下去,直到该follower同步完消息才发送ack。因此,这才有了ISR。
In-sync replica(ISR)称之为同步副本,ISR中的副本都是与Leader进行同步的副本,因此不在该列表的follower会被认为与Leader是不一样步的. 那么,ISR中存在是什么副本呢?首先能够明确的是:Leader副本老是存在于ISR中. 而follower副本是否在ISR中,取决于该follower副本是否与Leader副本保持了“同步”。ide
对于"follower副本是否与Leader副本保持了同步"的理解以下:
(1) 上面所说的同步不是指彻底的同步,即并非说一旦follower副本同步滞后与Leader副本,就会被踢出ISR列表.
(2) Kafka的broker端有一个参数replica.lag.time.max.ms, 该参数表示follower副本滞后与Leader副本的最长时间间隔,默认是10秒。这就意味着,只要follower副本落后于leader副本的时间间隔不超过10秒,就能够认为该follower副本与leader副本是同步的,因此哪怕当前follower副本落后于Leader副本几条消息,只要在10秒以内遇上Leader副本,就不会被踢出出局。
(3) 若是follower副本被踢出ISR列表,等到该副本追上了Leader副本的进度,该副本会被再次加入到ISR列表中,因此ISR是一个动态列表,并非静态不变的.this
ack应答机制:
acks参数指定了必需要有多少个分区副本收到消息,生产者才认为该消息是写入成功的,这个参数对于消息是否丢失起着重要做用,该参数的配置具体以下:
1).acks=0,生产者不须要等待leader的响应,leader一接收到还没写入磁盘时leader故障,就会形成消息丢失
2).acks=1,leader把消息写入磁盘后返回ack。若是在follower同步以前leader发生故障,会形成消息丢失
3).acks=-1,leader和全部follower把消息写入磁盘后才返回ack。在follower同步完成后,leader发送ack以前,leader发生故障,会致使消息重复发送。虽然-1时基本上不会丢失数据,可是在某种极端状况下仍是会的,即全部isr的副本都宕机,而另外一些由于一些问题没有宕机也不在isr中的follower恢复了,就从新加入isr了,并从新选出leader,(退化到acks=1的状况了)。。。此时本来isr中的一些最新数据没有被同步到新的isr的副本节点中,那么那些数据就丢失了3d
LEO(Log End Offset) & HW(High Watermark)
LEO:指的是每一个副本中最大的offset
HW:指的是ISR队列中最小的LEO
(1) follower故障:
follower发生故障后会被临时踢出ISR,待该follower恢复后,follower会从磁盘中读取上次的HW,并将log中高于HW的部分截取掉,从HW开始向leader进行同步。等该follower的LEO大于等于该partition的HW,即follower追上leader以后,就能够从新加入ISR了。
(2) leader故障:
leader发生故障后,会从ISR中选出一个新的leader,以后,为了保证多个副本之间的数据一致性,其他的follower会先将各自log文件中高于HW的部分截取掉,而后重新的leader同步数据。
注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复
Exactly Once
· at-least-once:若是producer收到来自Kafka broker的确认(ack)或者acks = all,则表示该消息已经写入到Kafka。但若是producer ack超时或收到错误,则可能会重试发送消息,客户端会认为该消息未写入Kafka。若是broker在发送Ack以前失败,但在消息成功写入Kafka以后,此重试将致使该消息被写入两次,所以消息会被不止一次地传递给最终consumer,这种策略可能致使重复的工做和不正确的结果。
· at-most-once:若是在ack超时或返回错误时producer不重试,则该消息可能最终不会写入Kafka,所以不会传递给consumer。在大多数状况下,这样作是为了不重复的可能性,业务上必须接收数据传递可能的丢失。
· exactly-once:即便producer重试发送消息,消息也会保证最多一次地传递给最终consumer。该语义是最理想的,但也难以实现,这是由于它须要消息系统自己与生产和消费消息的应用程序进行协做。例如若是在消费消息成功后,将Kafka consumer的偏移量rollback,咱们将会再次从该偏移量开始接收消息。这代表消息传递系统和客户端应用程序必须配合调整才能实现excactly-once。
at-least-once + 幂等性 = exactly-once要启用幂等性,只须要将Producer的参数中enable.idempotence设置为true。开启幂等性的Producer会在初始化的时候分配一个PID,发往同一个partition的消息会附带一个sequence number,而broker端会对<PID,Partition,SeqNumber>作缓存,当具备相同主键的消息提交时,broker只会持久化一条