Kafka笔记5

Kafka使用zookeeper来维护集群成员的信息。每一个broker都有一个惟一标识符,这个标识符能够在配置文件指定,也能够自动生成。算法

在broker停机,出现网络分区或者长时间垃圾回收停顿时,broker会从zookeeper上断开链接,此时broker在启动时建立的临时节点会自动从zookeeper上移除。监听broker列表的Kafka组件会被告知该broker已移除。数据库

在彻底关闭一个broker以后,若是使用相同的ID启动一个全新的broker,它会当即加入集群,并拥有与旧broker相同的分区和主题。缓存

Kafka使用主题来组织数据,每一个主题被分为若干个分区,每一个分区有多个副本,副本被保存在broker上,每一个broker均可以保存上千个属于不一样主题和分区的副本。安全

副本有两种类型:服务器

首领副本,每一个分区都有一个首领副本,为了保证一致性,因此生产者和消费者请求都会通过这个副本。网络

跟随者副本,首领之外的副本都是跟随副本,跟随副本不处理来自客户端的请求,它们惟一的任务时从首领那里复制消息,保持与首领一致的状态。若是首领发生崩溃,其中一个跟随者就会被提高为首领。并发

首领的另外一个任务时搞清楚哪一个跟随者的状态与本身时一致的,若是跟随者10s内没有请求任何消息,或者10s内没有请求最新的数据,它们会被认为是不一样步的,若是一个副本没法与首领保持一致,首领失效时他就不能称为新的首领,毕竟他没有包含所有的消息。tcp

除了当前首领外,每一个分区都有一个首选首领,——建立主题时选的首领就是分区的首选首领,之因此叫作首选首领,是由于建立分区时,须要在broker之间均衡首领,所以咱们但愿首选首领在称为真正的首领时,broker间的负载最终会获得平衡。默认状况下Kafka的auto.leader.rebalance.enable=true,它会检查首选首领是否是当前首领,若是不是,而且该副本时同步的,那就会触发首领选举,让首选首领成为当前首领。工具

broker的大部分工做时处理客户端,分区副本和控制器发送给分区首领的请求。Kafka提供了一个二机制协议(基于tcp),指定了请求消息的格式以及broker如何对请求做出响应——包括成功处理请求或在处理请求中遇到错误。客户端发起链接并发送请求,broker处理请求并做出响应。broker按照请求到达的顺序处理它们——这种顺序保证了Kafka具备了消息队列的特性,同时保证保存的消息时有序的。性能

全部的请求消息包含一个标准消息头:

1.request type

2.request version

3.correlation ID——一个具备惟一性的数字,用于标识请求消息,同时也会出如今响应消息和错误日志中。

4.client ID——用于标识发送的请求的客户端

broker会在它监听的每个端口上运行一个acceptor,这个线程会建立一个链接,并把它交给processor线程去处理。processor线程(网络线程)的数量时可配置的,网络线程负责从客户端获取请求消息,把他们放进请求队列,而后从响应队列获取响应消息,把他们发送给客户端。

请求消息被放到请求队列后,IO线程会负责处理它们,下面是几种最多见的请求类型:

1.生产请求:生产者发送的请求,它包含客户端要写入broker的消息

2.获取请求:在消费者和跟随者副本须要从broker读取消息时发送的请求

生产请求和获取请求都必须发送给分区的首领副本,若是broker收到一个针对特定分区的请求,而该分区的首领在另外一个broker上,那么发送请求的客户端会收到一个“非分区首领”的错误响应。Kafka客户端要本身负责把生产请求和获取请求发送到正确的broker上。

那么客户端怎么知道往哪里发送请求呢?

客户端使用了另外一种请求类型,也就是元数据请求,这种请求包含了客户端感兴趣的主题列表。服务器端响应消息里指明了这些主题所包含的分区,每一个分区都有哪些副本,以及哪一个副本是首领。元数据请求能够发送给任意一个broker,由于全部broker都缓存了这些信息。

通常状况下,客户端会把这些信息缓存起来,并直接往目标broker上发送生产和获取请求。它们须要时不时地发送元数据请求来刷新这些信息(刷新时间间隔经过meta.max.age.ms配置)从而知道元数据是否发生了变动——好比在新的broker加入集群时,部分副本被移动到新broker上。另外若是客户端收到非首领错误,它会尝试重发请求以前先刷新元数据,由于这个错误说明客户端正在使用过时的元数据信息。

5.4.1生产请求

Acks配置参数——该参数指定了须要多少个broker确认才能够任务一个消息写入时成功的。不一样的配置对“写入成功”的界定不一样;

若是acks=1,那么只要首领收到消息就认为写入成功;

若是acks=all,那么须要全部的同步副本收到消息才算写入成功;

若是acks=0,那么生产者把消息发送以后,彻底不须要等待broker的响应。

包含首领副本的broker在收到生产请求时,会对请求作一些验证。

1.发送数据的用户是否有主题写入权限?

2.请求里包含的acks值是否有效(只能出现0,1或all)

3.若是acks=all,是否有足够多同步副本保证消息已经被安全写入?(若是同步副本不足,broker能够拒绝处理新消息。)

以后消息被写入本地磁盘。在Linux系统上,消息会被写到文件系统缓存里,并不保证什么时候会被刷新到磁盘上,Kafka不会一直等待数据被写到磁盘上——它依赖复制功能来保证消息的持久性。

在消息被写入分区首领以后,broker开始检查acks配置参数——若是acks被设为0或1,那么broker当即返回响应;若是acks被设为all,那么请求会被保存在一个叫作炼狱的缓冲区,直到首领发现全部跟随者副本都复制了消息,响应才会返回给客户端。

5.4.2获取请求

broker处理获取请求的方式与处理生产请求类似。客户端发送请求,想broker请求主题分区具备特定偏移量的消息。客户端能够指定broker最多从一个分区返回多少数据。这个限定很是重要,由于客户端须要为broker返回的数据分配足够的内存。若是没有这个限制,broker返回的大量数据可能耗尽客户端内存。若是请求的偏移量存在,broker将按照客户端指定的数量上限从分区读取消息,再把消息返回客户端。Kafka使用零复制技术想客户端发送消息——也就是说Kafka直接把消息发送到网络通道,不须要通过任何中间缓冲区。这种技术避免了字节复制,也不须要管理内存缓存区,从而得到更好性能。

客户端除了能够设置返回数据的上限,也能够设置下限。在主题消息流量不是很大的状况下,这样能够减小CPU和网络开销。客户端发送一个请求,broker有足够数据才把它们返回客户端。固然客户端能够设定一个超时时间,当到达时间后,即使broker没有足够数据,也会发送到消费者。

并不时保存在分区首领上的数据都能被客户端读取,大部分客户端只能读取已经被写入全部同步副本的消息,分区首领知道每一个消息会被复制到哪一个副本上,在消息被写入全部同步副本以前,是不会发给消费者的——尝试获取这些消息的请求会获得空的响应而不是错误。

这意味着若是broker间的消息复制变慢,那么消息到达消费者的时间也会变长,延迟时间能够经过replica.lag.time.max.ms来配置,它指定来副本在复制消息时可被容许的最大延迟时间。

以前的Kafka消费者使用zookeeper跟踪偏移量,以后决定把偏移量保存在特定的Kafka主题上。为了达到这个目的,咱们不得不往协议里增长几种请求类型在:offsetCommitRequest,offsetFetchRequest和listOffsetRequest,如今应用程序调用commitOffset方法时客户端再也不把偏移量写入zookeeper,而是向Kafka发送offsetCommitRequest请求。

主题的建立仍然须要经过命令行来完成,命令行工具会直接更新zookeeper里的主题列表,broker监听这些主题列表,在有新主题加入时,它们会收到通知。咱们正在改进Kafka,增长createTopicRequest,这样客户端就可直接向broker请求建立新主题了。

咱们在0.10.0版本增长了APIVersionRequest,客户端能够循环broker支持哪些版本的请求,而后使用正确的版本与broker通讯。

5.5物理存储

Kafka的基本存储单元时分区,分区没法在多个broker间在细分。

在建立主题时,Kafka首选会决定如何在broker间分配分区,咱们要达到以下目标:

1.在broker间平均分配分区副本,

2.确保每一个分区的每一个副本分布在不一样的broker上

3.若是broker指定了机架信息,那么尽量把每一个分区分配到不一样机架的broker上。

为分区和副本选好合适的broker以后,接下来要以为这些分区应该使用哪一个目录。咱们单独为每一个分区分配目录,规则:计算每一个目录的分区数量,新的分区老是被添加到数量最小的目录里。

5.5.2文件管理

保留数据时Kafka的一个基本特性,Kafka不会一直保留数据,也不会等到全部消费者都读取了消息以后才能删除消息。相反,Kafka管理员为每一个主题配置了数据保留期限,规定了数据被删除以前能够保留多长时间,或者清理数据以前能够保留的数据量大小。

由于在一个大文件里查找和删除消息时很费时的,也容易出错,因此把分区分红若干个片断,默认状况下,每一个片断包含1G或一周的数据,比较小的为准,在broker往分区写入数据时,若是达到片断上限,就关闭当前文件,并打开一个新文件。

当前正在写入数据的片断叫作活跃片断,活跃片断永远不会被删除。

5.5.3文件格式

咱们把Kafka消息和偏移量保存在文件里,保存在磁盘上的数据格式与从生产者发送过来货发送给消费者的消息格式时同样的。由于使用了相同的消息格式进行磁盘存储和网络传输,Kafka可使用零复制技术给消费者发送消息,同时避免了对生产者压缩过的消息进行解压和再压缩。

除了键值和偏移量外,消息还包含了消息大小,校验和,消息格式版本号,压缩算法和时间戳。

时间戳能够时生产者发送消息的时间,也能够是消息到达broker的时间,这个能够配置。

Kafka附带了一个叫作DumpLogSegment的工具,能够查看片断内容,显示每一个消息的偏移量,校验和,魔术数字节,消息大小和压缩算法。

5.5.4索引

为了帮助broker更快定位到指定偏移量,Kafka为每一个分区维护了一个索引。索引把偏移量映射到片断文件和偏移量在文件的位置。

5.5.5清理

Kafka经过改变主题的保留策略来知足这些场景(只关心最新的数据),早于保留时间的旧事件会被删除,为每一个键保留最新值,只有当应用程序的事件里包含键值对时,为这些主题设置compact策略才有意义。

若是Kafka启动时启动了清理功能(经过设置log.cleaner.enabled)每一个broker会启动一个清理管理器线程和多个清理线程,它们负责执行清理任务,这些线程会选择污浊率较高的分区进行清理。

5.5.7被删除的事件

若是为了把一个键从系统删除,应用程序必须发送一个包含该键且值为null的消息。清理线程发现该消息时,会先进行常规清理,只保留值为null的消息,若是消费者往数据库复制Kafka数据,当看到这个消息时,就知道要把相关信息从数据库里删除。过一段时间后,清理线程就会移除这个消息。键也会从Kafka消失。

5.5.8什么时候会清理主题

就像delete策略不删除当前活跃片断同样,compact策略也不会对当前片断进行清理,只有旧片断的消息才会被清理。Kafka会在包含脏记录的主题达到50%时进行清理。

相关文章
相关标签/搜索