Kafka中的事务可使应用程序将消费消息、生产消息、提交消费位移看成原子操做来处理,同时成功或失败,即便该生产或消费会跨多个分区。linux
生产者必须提供惟一的transactionalId,启动后请求事务协调器获取一个PID,transactionalId与PID一一对应。算法
每次发送数据给<Topic, Partition>前,须要先向事务协调器发送AddPartitionsToTxnRequest,事务协调器会将该<Transaction, Topic, Partition>存于__transaction_state内,并将其状态置为BEGIN。缓存
在处理完 AddOffsetsToTxnRequest 以后,生产者还会发送 TxnOffsetCommitRequest 请求给 GroupCoordinator,从而将本次事务中包含的消费位移信息 offsets 存储到主题 __consumer_offsets 中服务器
一旦上述数据写入操做完成,应用程序必须调用KafkaProducer的commitTransaction方法或者abortTransaction方法以结束当前事务。不管调用 commitTransaction() 方法仍是 abortTransaction() 方法,生产者都会向 TransactionCoordinator 发送 EndTxnRequest 请求。
TransactionCoordinator 在收到 EndTxnRequest 请求后会执行以下操做:网络
在消费端有一个参数isolation.level,设置为“read_committed”,表示消费端应用不能够看到还没有提交的事务内的消息。若是生产者开启事务并向某个分区值发送3条消息 msg一、msg2 和 msg3,在执行 commitTransaction() 或 abortTransaction() 方法前,设置为“read_committed”的消费端应用是消费不到这些消息的,不过在 KafkaConsumer 内部会缓存这些消息,直到生产者执行 commitTransaction() 方法以后它才能将这些消息推送给消费端应用。反之,若是生产者执行了 abortTransaction() 方法,那么 KafkaConsumer 会将这些缓存的消息丢弃而不推送给消费端应用。负载均衡
正常状况下,分区的全部副本都处于 ISR 集合中,可是不免会有异常状况发生,从而某些副本被剥离出 ISR 集合中。在 ISR 集合以外,也就是处于同步失效或功能失效(好比副本处于非存活状态)的副本统称为失效副本,失效副本对应的分区也就称为同步失效分区,即 under-replicated 分区。socket
Kafka 从 0.9.x 版本开始就经过惟一的 broker 端参数 replica.lag.time.max.ms 来抉择,当 ISR 集合中的一个 follower 副本滞后 leader 副本的时间超过此参数指定的值时则断定为同步失败,须要将此 follower 副本剔除出 ISR 集合。replica.lag.time.max.ms 参数的默认值为10000。分布式
在 0.9.x 版本以前,Kafka 中还有另外一个参数 replica.lag.max.messages(默认值为4000),它也是用来断定失效副本的,当一个 follower 副本滞后 leader 副本的消息数超过 replica.lag.max.messages 的大小时,则断定它处于同步失效的状态。它与 replica.lag.time.max.ms 参数断定出的失效副本取并集组成一个失效副本的集合,从而进一步剥离出分区的 ISR 集合。工具
Kafka 源码注释中说明了通常有这几种状况会致使副本失效:性能
咱们用UnderReplicatedPartitions表明leader副本在当前Broker上且具备失效副本的分区的个数。
若是集群中有多个Broker的UnderReplicatedPartitions保持一个大于0的稳定值时,通常暗示着集群中有Broker已经处于下线状态。这种状况下,这个Broker中的分区个数与集群中的全部UnderReplicatedPartitions(处于下线的Broker是不会上报任何指标值的)之和是相等的。一般这类问题是因为机器硬件缘由引发的,但也有多是因为操做系统或者JVM引发的 。
若是集群中存在Broker的UnderReplicatedPartitions频繁变更,或者处于一个稳定的大于0的值(这里特指没有Broker下线的状况)时,通常暗示着集群出现了性能问题,一般这类问题很难诊断,不过咱们能够一步一步的将问题的范围缩小,好比先尝试肯定这个性能问题是否只存在于集群的某个Broker中,仍是整个集群之上。若是肯定集群中全部的under-replicated分区都是在单个Broker上,那么能够看出这个Broker出现了问题,进而能够针对这单一的Broker作专项调查,好比:操做系统、GC、网络状态或者磁盘状态(好比:iowait、ioutil等指标)。
某个分区有3个副本分别位于 broker0、broker1 和 broker2 节点中,假设 broker0 上的副本1为当前分区的 leader 副本,那么副本2和副本3就是 follower 副本,整个消息追加的过程能够归纳以下:
某一时刻,leader 副本的 LEO 增长至5,而且全部副本的 HW 还都为0。
以后 follower 副本(不带阴影的方框)向 leader 副本拉取消息,在拉取的请求中会带有自身的 LEO 信息,这个 LEO 信息对应的是 FetchRequest 请求中的 fetch_offset。leader 副本返回给 follower 副本相应的消息,而且还带有自身的 HW 信息,如上图(右)所示,这个 HW 信息对应的是 FetchResponse 中的 high_watermark。
此时两个 follower 副本各自拉取到了消息,并更新各自的 LEO 为3和4。与此同时,follower 副本还会更新本身的 HW,更新 HW 的算法是比较当前 LEO 和 leader 副本中传送过来的HW的值,取较小值做为本身的 HW 值。当前两个 follower 副本的 HW 都等于0(min(0,0) = 0)。
接下来 follower 副本再次请求拉取 leader 副本中的消息,以下图(左)所示。
此时 leader 副本收到来自 follower 副本的 FetchRequest 请求,其中带有 LEO 的相关信息,选取其中的最小值做为新的 HW,即 min(15,3,4)=3。而后连同消息和 HW 一块儿返回 FetchResponse 给 follower 副本,如上图(右)所示。注意 leader 副本的 HW 是一个很重要的东西,由于它直接影响了分区数据对消费者的可见性。
两个 follower 副本在收到新的消息以后更新 LEO 而且更新本身的 HW 为3(min(LEO,3)=3)。
HW 是 High Watermark 的缩写,俗称高水位,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个 offset 以前的消息。
分区 ISR 集合中的每一个副本都会维护自身的 LEO,而 ISR 集合中最小的 LEO 即为分区的 HW,对消费者而言只能消费 HW 以前的消息。
leader epoch 表明 leader 的纪元信息(epoch),初始值为0。每当 leader 变动一次,leader epoch 的值就会加1,至关于为 leader 增设了一个版本号。
每一个副本中还会增设一个矢量 <LeaderEpoch => StartOffset>,其中 StartOffset 表示当前 LeaderEpoch 下写入的第一条消息的偏移量。
假设有两个节点A和B,B是leader节点,里面的数据如图:
A发生重启,以后A不是先忙着截断日志而是先发送OffsetsForLeaderEpochRequest请求给B,B做为目前的leader在收到请求以后会返回当前的LEO(LogEndOffset,注意图中LE0和LEO的不一样),与请求对应的响应为OffsetsForLeaderEpochResponse。若是 A 中的 LeaderEpoch(假设为 LE_A)和 B 中的不相同,那么 B 此时会查找 LeaderEpoch 为 LE_A+1 对应的 StartOffset 并返回给 A
如上图所示,A 在收到2以后发现和目前的 LEO 相同,也就不须要截断日志了,以此来保护数据的完整性。
再如,以后 B 发生了宕机,A 成为新的 leader,那么对应的 LE=0 也变成了 LE=1,对应的消息 m2 此时就获得了保留。后续的消息均可以以 LE1 为 LeaderEpoch 陆续追加到 A 中。这个时候A就会有两个LE,第二LE所记录的Offset从2开始。若是B恢复了,那么就会从A中获取到LE+1的Offset为2的值返回给B。
再来看看LE如何解决数据不一致的问题:
当前 A 为 leader,B 为 follower,A 中有2条消息 m1 和 m2,而 B 中有1条消息 m1。假设 A 和 B 同时“挂掉”,而后 B 第一个恢复过来并成为新的 leader。
以后 B 写入消息 m3,并将 LEO 和 HW 更新至2,以下图所示。注意此时的 LeaderEpoch 已经从 LE0 增至 LE1 了。
紧接着 A 也恢复过来成为 follower 并向 B 发送 OffsetsForLeaderEpochRequest 请求,此时 A 的 LeaderEpoch 为 LE0。B 根据 LE0 查询到对应的 offset 为1并返回给 A,A 就截断日志并删除了消息 m2,以下图所示。以后 A 发送 FetchRequest 至 B 请求来同步数据,最终A和B中都有两条消息 m1 和 m3,HW 和 LEO都为2,而且 LeaderEpoch 都为 LE1,如此便解决了数据不一致的问题。
由于这样有两个明显的缺点:
对于Kafka来讲,必要性不是很高,由于在Kafka集群中,若是存在多个副本,通过合理的配置,可让leader副本均匀的分布在各个broker上面,使每一个 broker 上的读写负载都是同样的。
在发送延时消息的时候并非先投递到要发送的真实主题(real_topic)中,而是先投递到一些 Kafka 内部的主题(delay_topic)中,这些内部主题对用户不可见,而后经过一个自定义的服务拉取这些内部主题中的消息,并将知足条件的消息再投递到要发送的真实的主题中,消费者所订阅的仍是真实的主题。
若是采用这种方案,那么通常是按照不一样的延时等级来划分的,好比设定5s、10s、30s、1min、2min、5min、10min、20min、30min、45min、1hour、2hour这些按延时时间递增的延时等级,延时的消息按照延时时间投递到不一样等级的主题中,投递到同一主题中的消息的延时时间会被强转为与此主题延时等级一致的延时时间,这样延时偏差控制在两个延时等级的时间差范围以内(好比延时时间为17s的消息投递到30s的延时主题中,以后按照延时时间为30s进行计算,延时偏差为13s)。虽然有必定的延时偏差,可是偏差可控,而且这样只需增长少量的主题就能实现延时队列的功能。
发送到内部主题(delay_topic_*)中的消息会被一个独立的 DelayService 进程消费,这个 DelayService 进程和 Kafka broker 进程以一对一的配比进行同机部署(参考下图),以保证服务的可用性。
针对不一样延时级别的主题,在 DelayService 的内部都会有单独的线程来进行消息的拉取,以及单独的 DelayQueue(这里用的是 JUC 中 DelayQueue)进行消息的暂存。与此同时,在 DelayService 内部还会有专门的消息发送线程来获取 DelayQueue 的消息并转发到真实的主题中。从消费、暂存再到转发,线程之间都是一一对应的关系。以下图所示,DelayService 的设计应当尽可能保持简单,避免锁机制产生的隐患。
为了保障内部 DelayQueue 不会由于未处理的消息过多而致使内存的占用过大,DelayService 会对主题中的每一个分区进行计数,当达到必定的阈值以后,就会暂停拉取该分区中的消息。
由于一个主题中通常不止一个分区,分区之间的消息并不会按照投递时间进行排序,DelayQueue的做用是将消息按照再次投递时间进行有序排序,这样下游的消息发送线程就可以按照前后顺序获取最早知足投递条件的消息。
死信能够看做消费者不能处理收到的消息,也能够看做消费者不想处理收到的消息,还能够看做不符合处理要求的消息。好比消息内包含的消息内容没法被消费者解析,为了确保消息的可靠性而不被随意丢弃,故将其投递到死信队列中,这里的死信就能够看做消费者不能处理的消息。再好比超过既定的重试次数以后将消息投入死信队列,这里就能够将死信看做不符合处理要求的消息。
重试队列其实能够看做一种回退队列,具体指消费端消费消息失败时,为了防止消息无端丢失而从新将消息回滚到 broker 中。与回退队列不一样的是,重试队列通常分红多个重试等级,每一个重试等级通常也会设置从新投递延时,重试次数越多投递延时就越大。
理解了他们的概念以后咱们就能够为每一个主题设置重试队列,消息第一次消费失败入重试队列 Q1,Q1 的从新投递延时为5s,5s事后从新投递该消息;若是消息再次消费失败则入重试队列 Q2,Q2 的从新投递延时为10s,10s事后再次投递该消息。
而后再设置一个主题做为死信队列,重试越屡次从新投递的时间就越久,而且须要设置一个上限,超过投递次数就进入死信队列。重试队列与延时队列有相同的地方,都须要设置延时级别。
消息审计是指在消息生产、存储和消费的整个过程之间对消息个数及延迟的审计,以此来检测是否有数据丢失、是否有数据重复、端到端的延迟又是多少等内容。
目前与消息审计有关的产品也有多个,好比 Chaperone(Uber)、Confluent Control Center、Kafka Monitor(LinkedIn),它们主要经过在消息体(value 字段)或在消息头(headers 字段)中内嵌消息对应的时间戳 timestamp 或全局的惟一标识 ID(或者是二者兼备)来实现消息的审计功能。
内嵌 timestamp 的方式主要是设置一个审计的时间间隔 time_bucket_interval(能够自定义设置几秒或几分钟),根据这个 time_bucket_interval 和消息所属的 timestamp 来计算相应的时间桶(time_bucket)。
内嵌 ID 的方式就更加容易理解了,对于每一条消息都会被分配一个全局惟一标识 ID。若是主题和相应的分区固定,则能够为每一个分区设置一个全局的 ID。当有消息发送时,首先获取对应的 ID,而后内嵌到消息中,最后才将它发送到 broker 中。消费者进行消费审计时,能够判断出哪条消息丢失、哪条消息重复。
消息轨迹指的是一条消息从生产者发出,经由 broker 存储,再到消费者消费的整个过程当中,各个相关节点的状态、时间、地点等数据汇聚而成的完整链路信息。生产者、broker、消费者这3个角色在处理消息的过程当中都会在链路中增长相应的信息,将这些信息汇聚、处理以后就能够查询任意消息的状态,进而为生产环境中的故障排除提供强有力的数据支持。
对消息轨迹而言,最多见的实现方式是封装客户端,在保证正常生产消费的同时添加相应的轨迹信息埋点逻辑。不管生产,仍是消费,在执行以后都会有相应的轨迹信息,咱们须要将这些信息保存起来。
咱们一样能够将轨迹信息保存到 Kafka 的某个主题中,好比下图中的主题 trace_topic。
生产者在将消息正常发送到用户主题 real_topic 以后(或者消费者在拉取到消息消费以后)会将轨迹信息发送到主题 trace_topic 中。
若是消费者客户端的 isolation.level 参数配置为“read_uncommitted”(默认),它对应的 Lag 等于HW – ConsumerOffset 的值,其中 ConsumerOffset 表示当前的消费位移。
若是这个参数配置为“read_committed”,那么就要引入 LSO 来进行计算了。LSO 是 LastStableOffset 的缩写,它对应的 Lag 等于 LSO – ConsumerOffset 的值。
比较重要的 Broker 端 JMX 指标:
一个Consumer Group中能够有多个consumer,多个consumer能够同时消费不一样分区的消息,大大的提升了消费者的并行消费能力。可是一个分区中的消息只能被一个Consumer Group中的一个consumer消费。
网络传输上减小开销
批量发送:
在发送消息的时候,kafka不会直接将少许数据发送出去,不然每次发送少许的数据会增长网络传输频率,下降网络传输效率。kafka会先将消息缓存在内存中,当超过一个的大小或者超过必定的时间,那么会将这些消息进行批量发送。
端到端压缩:
固然网络传输时数据量小也能够减少网络负载,kafaka会将这些批量的数据进行压缩,将一批消息打包后进行压缩,发送broker服务器后,最终这些数据仍是提供给消费者用,因此数据在服务器上仍是保持压缩状态,不会进行解压,并且频繁的压缩和解压也会下降性能,最终仍是以压缩的方式传递到消费者的手上。
顺序读写
kafka将消息追加到日志文件中,利用了磁盘的顺序读写,来提升读写效率。
零拷贝技术
零拷贝将文件内容从磁盘经过DMA引擎复制到内核缓冲区,并且没有把数据复制到socket缓冲区,只是将数据位置和长度信息的描述符复制到了socket缓存区,而后直接将数据传输到网络接口,最后发送。这样大大减少了拷贝的次数,提升了效率。kafka正是调用linux系统给出的sendfile系统调用来使用零拷贝。Java中的系统调用给出的是FileChannel.transferTo接口。
Kafka 中的索引文件以稀疏索引(sparse index)的方式构造消息的索引,它并不保证每一个消息在索引文件中都有对应的索引项。每当写入必定量(由 broker 端参数 log.index.interval.bytes 指定,默认值为4096,即 4KB)的消息时,偏移量索引文件和时间戳索引文件分别增长一个偏移量索引项和时间戳索引项,增大或减少 log.index.interval.bytes 的值,对应地能够增长或缩小索引项的密度。