Kafka 设计与原理详解

1、Kafka简介

本文综合了我以前写的kafka相关文章,可做为一个全面了解学习kafka的培训学习资料。
  • 1
  • 2

转载请注明出处 : 本文连接java

1.1 背景历史

当今社会各类应用系统诸如商业、社交、搜索、浏览等像信息工厂同样不断的生产出各类信息,在大数据时代,咱们面临以下几个挑战:linux

  1. 如何收集这些巨大的信息
  2. 如何分析它
  3. 如何及时作到如上两点

以上几个挑战造成了一个业务需求模型,即生产者生产(produce)各类信息,消费者消费(consume)(处理分析)这些信息,而在生产者与消费者之间,须要一个沟通二者的桥梁-消息系统。从一个微观层面来讲,这种需求也可理解为不一样的系统之间如何传递消息。web

1.2 Kafka诞生

Kafka由 linked-in 开源 
kafka-便是解决上述这类问题的一个框架,它实现了生产者和消费者之间的无缝链接。 
kafka-高产出的分布式消息系统(A high-throughput distributed messaging system)算法

1.3 Kafka如今

Apache kafka 是一个分布式的基于push-subscribe的消息系统,它具有快速、可扩展、可持久化的特色。它如今是Apache旗下的一个开源系统,做为hadoop生态系统的一部分,被各类商业公司普遍应用。它的最大的特性就是能够实时的处理大量数据以知足各类需求场景:好比基于hadoop的批处理系统、低延迟的实时系统、storm/spark流式处理引擎。apache

2、Kafka技术概览

2.1 Kafka的特性

  • 高吞吐量、低延迟:kafka每秒能够处理几十万条消息,它的延迟最低只有几毫秒
  • 可扩展性:kafka集群支持热扩展
  • 持久性、可靠性:消息被持久化到本地磁盘,而且支持数据备份防止数据丢失
  • 容错性:容许集群中节点失败(若副本数量为n,则容许n-1个节点失败)
  • 高并发:支持数千个客户端同时读写

2.2 Kafka一些重要设计思想

下面介绍先大致介绍一下Kafka的主要设计思想,可让相关人员在短期内了解到kafka相关特性,若是想深刻研究,后面会对其中每个特性都作详细介绍。api

  • Consumergroup:各个consumer能够组成一个组,每一个消息只能被组中的一个consumer消费,若是一个消息能够被多个consumer消费的话,那么这些consumer必须在不一样的组。
  • 消息状态:在Kafka中,消息的状态被保存在consumer中,broker不会关心哪一个消息被消费了被谁消费了,只记录一个offset值(指向partition中下一个要被消费的消息位置),这就意味着若是consumer处理很差的话,broker上的一个消息可能会被消费屡次。
  • 消息持久化:Kafka中会把消息持久化到本地文件系统中,而且保持极高的效率。
  • 消息有效期:Kafka会长久保留其中的消息,以便consumer能够屡次消费,固然其中不少细节是可配置的。
  • 批量发送:Kafka支持以消息集合为单位进行批量发送,以提升push效率。
  • push-and-pull : Kafka中的Producer和consumer采用的是push-and-pull模式,即Producer只管向broker push消息,consumer只管从broker pull消息,二者对消息的生产和消费是异步的。
  • Kafka集群中broker之间的关系:不是主从关系,各个broker在集群中地位同样,咱们能够随意的增长或删除任何一个broker节点。
  • 负载均衡方面: Kafka提供了一个 metadata API来管理broker之间的负载(对Kafka0.8.x而言,对于0.7.x主要靠zookeeper来实现负载均衡)。
  • 同步异步:Producer采用异步push方式,极大提升Kafka系统的吞吐率(能够经过参数控制是采用同步仍是异步方式)。
  • 分区机制partition:Kafka的broker端支持消息分区,Producer能够决定把消息发到哪一个分区,在一个分区中消息的顺序就是Producer发送消息的顺序,一个主题中能够有多个分区,具体分区的数量是可配置的。分区的意义很重大,后面的内容会逐渐体现。
  • 离线数据装载:Kafka因为对可拓展的数据持久化的支持,它也很是适合向Hadoop或者数据仓库中进行数据装载。
  • 插件支持:如今很多活跃的社区已经开发出很多插件来拓展Kafka的功能,如用来配合Storm、Hadoop、flume相关的插件。

2.3 kafka 应用场景

  • 日志收集:一个公司能够用Kafka能够收集各类服务的log,经过kafka以统一接口服务的方式开放给各类consumer,例如hadoop、Hbase、Solr等。
  • 消息系统:解耦和生产者和消费者、缓存消息等。
  • 用户活动跟踪:Kafka常常被用来记录web用户或者app用户的各类活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,而后订阅者经过订阅这些topic来作实时的监控分析,或者装载到hadoop、数据仓库中作离线分析和挖掘。
  • 运营指标:Kafka也常常用来记录运营监控数据。包括收集各类分布式应用的数据,生产各类操做的集中反馈,好比报警和报告。
  • 流式处理:好比spark streaming和storm
  • 事件源

2.4 Kafka架构组件

Kafka中发布订阅的对象是topic。咱们能够为每类数据建立一个topic,把向topic发布消息的客户端称做producer,从topic订阅消息的客户端称做consumer。Producers和consumers能够同时从多个topic读写数据。一个kafka集群由一个或多个broker服务器组成,它负责持久化和备份具体的kafka消息。数组

  • topic:消息存放的目录即主题
  • Producer:生产消息到topic的一方
  • Consumer:订阅topic消费消息的一方
  • Broker:Kafka的服务实例就是一个broker

2.5 Kafka Topic&Partition

消息发送时都被发送到一个topic,其本质就是一个目录,而topic由是由一些Partition Logs(分区日志)组成,其组织结构以下图所示:缓存

咱们能够看到,每一个Partition中的消息都是有序的,生产的消息被不断追加到Partition log上,其中的每个消息都被赋予了一个惟一的offset值。 
Kafka集群会保存全部的消息,无论消息有没有被消费;咱们能够设定消息的过时时间,只有过时的数据才会被自动清除以释放磁盘空间。好比咱们设置消息过时时间为2天,那么这2天内的全部消息都会被保存到集群中,数据只有超过了两天才会被清除。 
Kafka须要维持的元数据只有一个–消费消息在Partition中的offset值,Consumer每消费一个消息,offset就会加1。其实消息的状态彻底是由Consumer控制的,Consumer能够跟踪和重设这个offset值,这样的话Consumer就能够读取任意位置的消息。 
把消息日志以Partition的形式存放有多重考虑,第一,方便在集群中扩展,每一个Partition能够经过调整以适应它所在的机器,而一个topic又能够有多个Partition组成,所以整个集群就能够适应任意大小的数据了;第二就是能够提升并发,由于能够以Partition为单位读写了。服务器

3、Kafka 核心组件

3.1 Replications、Partitions 和Leaders

经过上面介绍的咱们能够知道,kafka中的数据是持久化的而且可以容错的。Kafka容许用户为每一个topic设置副本数量,副本数量决定了有几个broker来存放写入的数据。若是你的副本数量设置为3,那么一份数据就会被存放在3台不一样的机器上,那么就容许有2个机器失败。通常推荐副本数量至少为2,这样就能够保证增减、重启机器时不会影响到数据消费。若是对数据持久化有更高的要求,能够把副本数量设置为3或者更多。 
Kafka中的topic是以partition的形式存放的,每个topic均可以设置它的partition数量,Partition的数量决定了组成topic的log的数量。Producer在生产数据时,会按照必定规则(这个规则是能够自定义的)把消息发布到topic的各个partition中。上面将的副本都是以partition为单位的,不过只有一个partition的副本会被选举成leader做为读写用。 
关于如何设置partition值须要考虑的因素。一个partition只能被一个消费者消费(一个消费者能够同时消费多个partition),所以,若是设置的partition的数量小于consumer的数量,就会有消费者消费不到数据。因此,推荐partition的数量必定要大于同时运行的consumer的数量。另一方面,建议partition的数量大于集群broker的数量,这样leader partition就能够均匀的分布在各个broker中,最终使得集群负载均衡。在Cloudera,每一个topic都有上百个partition。须要注意的是,kafka须要为每一个partition分配一些内存来缓存消息数据,若是partition数量越大,就要为kafka分配更大的heap space。网络

3.2 Producers

Producers直接发送消息到broker上的leader partition,不须要通过任何中介一系列的路由转发。为了实现这个特性,kafka集群中的每一个broker均可以响应producer的请求,并返回topic的一些元信息,这些元信息包括哪些机器是存活的,topic的leader partition都在哪,现阶段哪些leader partition是能够直接被访问的。 
Producer客户端本身控制着消息被推送到哪些partition。实现的方式能够是随机分配、实现一类随机负载均衡算法,或者指定一些分区算法。Kafka提供了接口供用户实现自定义的分区,用户能够为每一个消息指定一个partitionKey,经过这个key来实现一些hash分区算法。好比,把userid做为partitionkey的话,相同userid的消息将会被推送到同一个分区。 
以Batch的方式推送数据能够极大的提升处理效率,kafka Producer 能够将消息在内存中累计到必定数量后做为一个batch发送请求。Batch的数量大小能够经过Producer的参数控制,参数值能够设置为累计的消息的数量(如500条)、累计的时间间隔(如100ms)或者累计的数据大小(64KB)。经过增长batch的大小,能够减小网络请求和磁盘IO的次数,固然具体参数设置须要在效率和时效性方面作一个权衡。 
Producers能够异步的并行的向kafka发送消息,可是一般producer在发送完消息以后会获得一个future响应,返回的是offset值或者发送过程当中遇到的错误。这其中有个很是重要的参数“acks”,这个参数决定了producer要求leader partition 收到确认的副本个数,若是acks设置数量为0,表示producer不会等待broker的响应,因此,producer没法知道消息是否发送成功,这样有可能会致使数据丢失,但同时,acks值为0会获得最大的系统吞吐量。 
若acks设置为1,表示producer会在leader partition收到消息时获得broker的一个确认,这样会有更好的可靠性,由于客户端会等待直到broker确认收到消息。若设置为-1,producer会在全部备份的partition收到消息时获得broker的确认,这个设置能够获得最高的可靠性保证。 
Kafka 消息有一个定长的header和变长的字节数组组成。由于kafka消息支持字节数组,也就使得kafka能够支持任何用户自定义的序列号格式或者其它已有的格式如Apache Avro、protobuf等。Kafka没有限定单个消息的大小,但咱们推荐消息大小不要超过1MB,一般通常消息大小都在1~10kB以前。

3.3 Consumers

Kafka提供了两套consumer api,分为high-level api和sample-api。Sample-api 是一个底层的API,它维持了一个和单一broker的链接,而且这个API是彻底无状态的,每次请求都须要指定offset值,所以,这套API也是最灵活的。 
在kafka中,当前读到消息的offset值是由consumer来维护的,所以,consumer能够本身决定如何读取kafka中的数据。好比,consumer能够经过重设offset值来从新消费已消费过的数据。无论有没有被消费,kafka会保存数据一段时间,这个时间周期是可配置的,只有到了过时时间,kafka才会删除这些数据。 
High-level API封装了对集群中一系列broker的访问,能够透明的消费一个topic。它本身维持了已消费消息的状态,即每次消费的都是下一个消息。 
High-level API还支持以组的形式消费topic,若是consumers有同一个组名,那么kafka就至关于一个队列消息服务,而各个consumer均衡的消费相应partition中的数据。若consumers有不一样的组名,那么此时kafka就至关与一个广播服务,会把topic中的全部消息广播到每一个consumer。 

4、Kafka核心特性

4.1 压缩

咱们上面已经知道了Kafka支持以集合(batch)为单位发送消息,在此基础上,Kafka还支持对消息集合进行压缩,Producer端能够经过GZIP或Snappy格式对消息集合进行压缩。Producer端进行压缩以后,在Consumer端需进行解压。压缩的好处就是减小传输的数据量,减轻对网络传输的压力,在对大数据处理上,瓶颈每每体如今网络上而不是CPU(压缩和解压会耗掉部分CPU资源)。 
那么如何区分消息是压缩的仍是未压缩的呢,Kafka在消息头部添加了一个描述压缩属性字节,这个字节的后两位表示消息的压缩采用的编码,若是后两位为0,则表示消息未被压缩。

4.2消息可靠性

在消息系统中,保证消息在生产和消费过程当中的可靠性是十分重要的,在实际消息传递过程当中,可能会出现以下三中状况:

  • 一个消息发送失败
  • 一个消息被发送屡次
  • 最理想的状况:exactly-once ,一个消息发送成功且仅发送了一次

有许多系统声称它们实现了exactly-once,可是它们其实忽略了生产者或消费者在生产和消费过程当中有可能失败的状况。好比虽然一个Producer成功发送一个消息,可是消息在发送途中丢失,或者成功发送到broker,也被consumer成功取走,可是这个consumer在处理取过来的消息时失败了。 
从Producer端看:Kafka是这么处理的,当一个消息被发送后,Producer会等待broker成功接收到消息的反馈(可经过参数控制等待时间),若是消息在途中丢失或是其中一个broker挂掉,Producer会从新发送(咱们知道Kafka有备份机制,能够经过参数控制是否等待全部备份节点都收到消息)。 
从Consumer端看:前面讲到过partition,broker端记录了partition中的一个offset值,这个值指向Consumer下一个即将消费message。当Consumer收到了消息,但却在处理过程当中挂掉,此时Consumer能够经过这个offset值从新找到上一个消息再进行处理。Consumer还有权限控制这个offset值,对持久化到broker端的消息作任意处理。

4.3 备份机制

备份机制是Kafka0.8版本的新特性,备份机制的出现大大提升了Kafka集群的可靠性、稳定性。有了备份机制后,Kafka容许集群中的节点挂掉后而不影响整个集群工做。一个备份数量为n的集群容许n-1个节点失败。在全部备份节点中,有一个节点做为lead节点,这个节点保存了其它备份节点列表,并维持各个备份间的状体同步。下面这幅图解释了Kafka的备份机制:

这里写图片描述

4.4 Kafka高效性相关设计

4.4.1 消息的持久化

Kafka高度依赖文件系统来存储和缓存消息,通常的人认为磁盘是缓慢的,这致使人们对持久化结构具备竞争性持怀疑态度。其实,磁盘远比你想象的要快或者慢,这决定于咱们如何使用磁盘。 
一个和磁盘性能有关的关键事实是:磁盘驱动器的吞吐量跟寻到延迟是相背离的,也就是所,线性写的速度远远大于随机写。好比:在一个6 7200rpm SATA RAID-5 的磁盘阵列上线性写的速度大概是600M/秒,可是随机写的速度只有100K/秒,二者相差将近6000倍。线性读写在大多数应用场景下是能够预测的,所以,操做系统利用read-ahead和write-behind技术来从大的数据块中预取数据,或者将多个逻辑上的写操做组合成一个大写物理写操做中。更多的讨论能够在ACMQueueArtical中找到,他们发现,对磁盘的线性读在有些状况下能够比内存的随机访问要快一些。 
为了补偿这个性能上的分歧,现代操做系统都会把空闲的内存用做磁盘缓存,尽管在内存回收的时候会有一点性能上的代价。全部的磁盘读写操做会在这个统一的缓存上进行。 
此外,若是咱们是在JVM的基础上构建的,熟悉java内存应用管理的人应该清楚如下两件事情:

  1. 一个对象的内存消耗是很是高的,常常是所存数据的两倍或者更多。
  2. 随着堆内数据的增多,Java的垃圾回收会变得很是昂贵。

基于这些事实,利用文件系统而且依靠页缓存比维护一个内存缓存或者其余结构要好——咱们至少要使得可用的缓存加倍,经过自动访问可用内存,而且经过存储更紧凑的字节结构而不是一个对象,这将有可能再次加倍。这么作的结果就是在一台32GB的机器上,若是不考虑GC惩罚,将最多有28-30GB的缓存。此外,这些缓存将会一直存在即便服务重启,然而进程内缓存须要在内存中重构(10GB缓存须要花费10分钟)或者它须要一个彻底冷缓存启动(很是差的初始化性能)。它同时也简化了代码,由于如今全部的维护缓存和文件系统之间内聚的逻辑都在操做系统内部了,这使得这样作比one-off in-process attempts更加高效与准确。若是你的磁盘应用更加倾向于顺序读取,那么read-ahead在每次磁盘读取中实际上获取到这人缓存中的有用数据。 
以上这些建议了一个简单的设计:不一样于维护尽量多的内存缓存而且在须要的时候刷新到文件系统中,咱们换一种思路。全部的数据不须要调用刷新程序,而是马上将它写到一个持久化的日志中。事实上,这仅仅意味着,数据将被传输到内核页缓存中并稍后被刷新。咱们能够增长一个配置项以让系统的用户来控制数据在何时被刷新到物理硬盘上。

4.4.2 常数时间性能保证

消息系统中持久化数据结构的设计一般是维护者一个和消费队列有关的B树或者其它可以随机存取结构的元数据信息。B树是一个很好的结构,能够用在事务型与非事务型的语义中。可是它须要一个很高的花费,尽管B树的操做须要O(logN)。一般状况下,这被认为与常数时间等价,但这对磁盘操做来讲是不对的。磁盘寻道一次须要10ms,而且一次只能寻一个,所以并行化是受限的。 
直觉上来说,一个持久化的队列能够构建在对一个文件的读和追加上,就像通常状况下的日志解决方案。尽管和B树相比,这种结构不能支持丰富的语义,可是它有一个优势,全部的操做都是常数时间,而且读写之间不会相互阻塞。这种设计具备极大的性能优点:最终系统性能和数据大小彻底无关,服务器能够充分利用廉价的硬盘来提供高效的消息服务。 
事实上还有一点,磁盘空间的无限增大而不影响性能这点,意味着咱们能够提供通常消息系统没法提供的特性。好比说,消息被消费后不是立马被删除,咱们能够将这些消息保留一段相对比较长的时间(好比一个星期)。

4.4.3 进一步提升效率

咱们已经为效率作了很是多的努力。可是有一种很是主要的应用场景是:处理Web活动数据,它的特色是数据量很是大,每一次的网页浏览都会产生大量的写操做。更进一步,咱们假设每个被发布的消息都会被至少一个consumer消费,所以咱们更要怒路让消费变得更廉价。 
经过上面的介绍,咱们已经解决了磁盘方面的效率问题,除此以外,在此类系统中还有两类比较低效的场景:

  • 太多小的I/O操做
  • 过多的字节拷贝

为了减小大量小I/O操做的问题,kafka的协议是围绕消息集合构建的。Producer一次网络请求能够发送一个消息集合,而不是每一次只发一条消息。在server端是以消息块的形式追加消息到log中的,consumer在查询的时候也是一次查询大量的线性数据块。消息集合即MessageSet,实现自己是一个很是简单的API,它将一个字节数组或者文件进行打包。因此对消息的处理,这里没有分开的序列化和反序列化的上步骤,消息的字段能够按需反序列化(若是没有须要,能够不用反序列化)。 
另外一个影响效率的问题就是字节拷贝。为了解决字节拷贝的问题,kafka设计了一种“标准字节消息”,Producer、Broker、Consumer共享这一种消息格式。Kakfa的message log在broker端就是一些目录文件,这些日志文件都是MessageSet按照这种“标准字节消息”格式写入到磁盘的。 
维持这种通用的格式对这些操做的优化尤其重要:持久化log 块的网络传输。流行的unix操做系统提供了一种很是高效的途径来实现页面缓存和socket之间的数据传递。在Linux操做系统中,这种方式被称做:sendfile system call(Java提供了访问这个系统调用的方法:FileChannel.transferTo api)。

为了理解sendfile的影响,须要理解通常的将数据从文件传到socket的路径:

  1. 操做系统将数据从磁盘读到内核空间的页缓存中
  2. 应用将数据从内核空间读到用户空间的缓存中
  3. 应用将数据写回内核空间的socket缓存中
  4. 操做系统将数据从socket缓存写到网卡缓存中,以便将数据经网络发出

这种操做方式明显是很是低效的,这里有四次拷贝,两次系统调用。若是使用sendfile,就能够避免两次拷贝:操做系统将数据直接从页缓存发送到网络上。因此在这个优化的路径中,只有最后一步将数据拷贝到网卡缓存中是须要的。 
咱们指望一个主题上有多个消费者是一种常见的应用场景。利用上述的zero-copy,数据只被拷贝到页缓存一次,而后就能够在每次消费时被重得利用,而不须要将数据存在内存中,而后在每次读的时候拷贝到内核空间中。这使得消息消费速度能够达到网络链接的速度。这样以来,经过页面缓存和sendfile的结合使用,整个kafka集群几乎都已以缓存的方式提供服务,并且即便下游的consumer不少,也不会对整个集群服务形成压力。 
关于sendfile和zero-copy,请参考:zero-copy

5、Kafka集群部署

5.1 集群部署

为了提升性能,推荐采用专用的服务器来部署kafka集群,尽可能与hadoop集群分开,由于kafka依赖磁盘读写和大的页面缓存,若是和hadoop共享节点的话会影响其使用页面缓存的性能。 
Kafka集群的大小须要根据硬件的配置、生产者消费者的并发数量、数据的副本个数、数据的保存时长综合肯定。 
磁盘的吞吐量尤其重要,由于一般kafka的瓶颈就在磁盘上。 
Kafka依赖于zookeeper,建议采用专用服务器来部署zookeeper集群,zookeeper集群的节点采用偶数个,通常建议用三、五、7个。注意zookeeper集群越大其读写性能越慢,由于zookeeper须要在节点之间同步数据。一个3节点的zookeeper集群容许一个节点失败,一个5节点集群容许2个几点失败。

5.2 集群大小

有不少因素决定着kafka集群须要具有存储能力的大小,最准确的衡量办法就是模拟负载来测算一下,Kafka自己也提供了负载测试的工具。 
若是不想经过模拟实验来评估集群大小,最好的办法就是根据硬盘的空间需求来推算。下面我就根据网络和磁盘吞吐量需求来作一下估算。 
咱们作以下假设:

  • W:每秒写多少MB
  • R :副本数
  • C :Consumer的数量

通常的来讲,kafka集群瓶颈在于网络和磁盘吞吐量,因此咱们先评估一下集群的网络和磁盘需求。 
对于每条消息,每一个副本都要写一遍,因此总体写的速度是W*R。读数据的部分主要是集群内部各个副本从leader同步消息读和集群外部的consumer读,因此集群内部读的速率是(R-1)*W,同时,外部consumer读的速度是C*W,所以:

  • Write:W*R
  • Read:(R-1)*W+C*W

须要注意的是,咱们能够在读的时候缓存部分数据来减小IO操做,若是一个集群有M MB内存,写的速度是W MB/sec,则容许M/(W*R) 秒的写能够被缓存。若是集群有32GB内存,写的速度是50MB/s的话,则能够至少缓存10分钟的数据。

5.3 Kafka性能测试

Performance testing

5.4 Kafka在zookeeper中的数据结构

Kafka data structures in Zookeeper

6、Kafka主要配置

6.1 Broker Config

属性 默认值 描述
broker.id   必填参数,broker的惟一标识
log.dirs /tmp/kafka-logs Kafka数据存放的目录。能够指定多个目录,中间用逗号分隔,当新partition被建立的时会被存放到当前存放partition最少的目录。
port 9092 BrokerServer接受客户端链接的端口号
zookeeper.connect null Zookeeper的链接串,格式为:hostname1:port1,hostname2:port2,hostname3:port3。能够填一个或多个,为了提升可靠性,建议都填上。注意,此配置容许咱们指定一个zookeeper路径来存放此kafka集群的全部数据,为了与其余应用集群区分开,建议在此配置中指定本集群存放目录,格式为:hostname1:port1,hostname2:port2,hostname3:port3/chroot/path 。须要注意的是,消费者的参数要和此参数一致。
message.max.bytes 1000000 服务器能够接收到的最大的消息大小。注意此参数要和consumer的maximum.message.size大小一致,不然会由于生产者生产的消息太大致使消费者没法消费。
num.io.threads 8 服务器用来执行读写请求的IO线程数,此参数的数量至少要等于服务器上磁盘的数量。
queued.max.requests 500 I/O线程能够处理请求的队列大小,若实际请求数超过此大小,网络线程将中止接收新的请求。
socket.send.buffer.bytes 100 * 1024 The SO_SNDBUFF buffer the server prefers for socket connections.
socket.receive.buffer.bytes 100 * 1024 The SO_RCVBUFF buffer the server prefers for socket connections.
socket.request.max.bytes 100 * 1024 * 1024 服务器容许请求的最大值, 用来防止内存溢出,其值应该小于 Java heap size.
num.partitions 1 默认partition数量,若是topic在建立时没有指定partition数量,默认使用此值,建议改成5
log.segment.bytes 1024 * 1024 * 1024 Segment文件的大小,超过此值将会自动新建一个segment,此值能够被topic级别的参数覆盖。
log.roll.{ms,hours} 24 * 7 hours 新建segment文件的时间,此值能够被topic级别的参数覆盖。
log.retention.{ms,minutes,hours} 7 days Kafka segment log的保存周期,保存周期超过此时间日志就会被删除。此参数能够被topic级别参数覆盖。数据量大时,建议减少此值。
log.retention.bytes -1 每一个partition的最大容量,若数据量超过此值,partition数据将会被删除。注意这个参数控制的是每一个partition而不是topic。此参数能够被log级别参数覆盖。
log.retention.check.interval.ms 5 minutes 删除策略的检查周期
auto.create.topics.enable true 自动建立topic参数,建议此值设置为false,严格控制topic管理,防止生产者错写topic。
default.replication.factor 1 默认副本数量,建议改成2。
replica.lag.time.max.ms 10000 在此窗口时间内没有收到follower的fetch请求,leader会将其从ISR(in-sync replicas)中移除。
replica.lag.max.messages 4000 若是replica节点落后leader节点此值大小的消息数量,leader节点就会将其从ISR中移除。
replica.socket.timeout.ms 30 * 1000 replica向leader发送请求的超时时间。
replica.socket.receive.buffer.bytes 64 * 1024 The socket receive buffer for network requests to the leader for replicating data.
replica.fetch.max.bytes 1024 * 1024 The number of byes of messages to attempt to fetch for each partition in the fetch requests the replicas send to the leader.
replica.fetch.wait.max.ms 500 The maximum amount of time to wait time for data to arrive on the leader in the fetch requests sent by the replicas to the leader.
num.replica.fetchers 1 Number of threads used to replicate messages from leaders. Increasing this value can increase the degree of I/O parallelism in the follower broker.
fetch.purgatory.purge.interval.requests 1000 The purge interval (in number of requests) of the fetch request purgatory.
zookeeper.session.timeout.ms 6000 ZooKeeper session 超时时间。若是在此时间内server没有向zookeeper发送心跳,zookeeper就会认为此节点已挂掉。 此值过低致使节点容易被标记死亡;若过高,.会致使太迟发现节点死亡。
zookeeper.connection.timeout.ms 6000 客户端链接zookeeper的超时时间。
zookeeper.sync.time.ms 2000 H ZK follower落后 ZK leader的时间。
controlled.shutdown.enable true 容许broker shutdown。若是启用,broker在关闭本身以前会把它上面的全部leaders转移到其它brokers上,建议启用,增长集群稳定性。
auto.leader.rebalance.enable true If this is enabled the controller will automatically try to balance leadership for partitions among the brokers by periodically returning leadership to the “preferred” replica for each partition if it is available.
leader.imbalance.per.broker.percentage 10 The percentage of leader imbalance allowed per broker. The controller will rebalance leadership if this ratio goes above the configured value per broker.
leader.imbalance.check.interval.seconds 300 The frequency with which to check for leader imbalance.
offset.metadata.max.bytes 4096 The maximum amount of metadata to allow clients to save with their offsets.
connections.max.idle.ms 600000 Idle connections timeout: the server socket processor threads close the connections that idle more than this.
num.recovery.threads.per.data.dir 1 The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
unclean.leader.election.enable true Indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though doing so may result in data loss.
delete.topic.enable false 启用deletetopic参数,建议设置为true。
offsets.topic.num.partitions 50 The number of partitions for the offset commit topic. Since changing this after deployment is currently unsupported, we recommend using a higher setting for production (e.g., 100-200).
offsets.topic.retention.minutes 1440 Offsets that are older than this age will be marked for deletion. The actual purge will occur when the log cleaner compacts the offsets topic.
offsets.retention.check.interval.ms 600000 The frequency at which the offset manager checks for stale offsets.
offsets.topic.replication.factor 3 The replication factor for the offset commit topic. A higher setting (e.g., three or four) is recommended in order to ensure higher availability. If the offsets topic is created when fewer brokers than the replication factor then the offsets topic will be created with fewer replicas.
offsets.topic.segment.bytes 104857600 Segment size for the offsets topic. Since it uses a compacted topic, this should be kept relatively low in order to facilitate faster log compaction and loads.
offsets.load.buffer.size 5242880 An offset load occurs when a broker becomes the offset manager for a set of consumer groups (i.e., when it becomes a leader for an offsets topic partition). This setting corresponds to the batch size (in bytes) to use when reading from the offsets segments when loading offsets into the offset manager’s cache.
offsets.commit.required.acks -1 The number of acknowledgements that are required before the offset commit can be accepted. This is similar to the producer’s acknowledgement setting. In general, the default should not be overridden.
offsets.commit.timeout.ms 5000 The offset commit will be delayed until this timeout or the required number of replicas have received the offset commit. This is similar to the producer request timeout.

6.2 Producer Config

属性 默认值 描述
metadata.broker.list   启动时producer查询brokers的列表,能够是集群中全部brokers的一个子集。注意,这个参数只是用来获取topic的元信息用,producer会从元信息中挑选合适的broker并与之创建socket链接。格式是:host1:port1,host2:port2。
request.required.acks 0 参见3.2节介绍
request.timeout.ms 10000 Broker等待ack的超时时间,若等待时间超过此值,会返回客户端错误信息。
producer.type sync 同步异步模式。async表示异步,sync表示同步。若是设置成异步模式,能够容许生产者以batch的形式push数据,这样会极大的提升broker性能,推荐设置为异步。
serializer.class kafka.serializer.DefaultEncoder 序列号类,.默认序列化成 byte[] 。
key.serializer.class   Key的序列化类,默认同上。
partitioner.class kafka.producer.DefaultPartitioner Partition类,默认对key进行hash。
compression.codec none 指定producer消息的压缩格式,可选参数为: “none”, “gzip” and “snappy”。关于压缩参见4.1节
compressed.topics null 启用压缩的topic名称。若上面参数选择了一个压缩格式,那么压缩仅对本参数指定的topic有效,若本参数为空,则对全部topic有效。
message.send.max.retries 3 Producer发送失败时重试次数。若网络出现问题,可能会致使不断重试。
retry.backoff.ms 100 Before each retry, the producer refreshes the metadata of relevant topics to see if a new leader has been elected. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata.
topic.metadata.refresh.interval.ms 600 * 1000 The producer generally refreshes the topic metadata from brokers when there is a failure (partition missing, leader not available…). It will also poll regularly (default: every 10min so 600000ms). If you set this to a negative value, metadata will only get refreshed on failure. If you set this to zero, the metadata will get refreshed after each message sent (not recommended). Important note: the refresh happen only AFTER the message is sent, so if the producer never sends a message the metadata is never refreshed
queue.buffering.max.ms 5000 启用异步模式时,producer缓存消息的时间。好比咱们设置成1000时,它会缓存1秒的数据再一次发送出去,这样能够极大的增长broker吞吐量,但也会形成时效性的下降。
queue.buffering.max.messages 10000 采用异步模式时producer buffer 队列里最大缓存的消息数量,若是超过这个数值,producer就会阻塞或者丢掉消息。
queue.enqueue.timeout.ms -1 当达到上面参数值时producer阻塞等待的时间。若是值设置为0,buffer队列满时producer不会阻塞,消息直接被丢掉。若值设置为-1,producer会被阻塞,不会丢消息。
batch.num.messages 200 采用异步模式时,一个batch缓存的消息数量。达到这个数量值时producer才会发送消息。
send.buffer.bytes 100 * 1024 Socket write buffer size
client.id “” The client id is a user-specified string sent in each request to help trace calls. It should logically identify the application making the request.

6.3 Consumer Config

属性 默认值 描述
group.id   Consumer的组ID,相同goup.id的consumer属于同一个组。
zookeeper.connect   Consumer的zookeeper链接串,要和broker的配置一致。
consumer.id null 若是不设置会自动生成。
socket.timeout.ms 30 * 1000 网络请求的socket超时时间。实际超时时间由max.fetch.wait + socket.timeout.ms 肯定。
socket.receive.buffer.bytes 64 * 1024 The socket receive buffer for network requests.
fetch.message.max.bytes 1024 * 1024 查询topic-partition时容许的最大消息大小。consumer会为每一个partition缓存此大小的消息到内存,所以,这个参数能够控制consumer的内存使用量。这个值应该至少比server容许的最大消息大小大,以避免producer发送的消息大于consumer容许的消息。
num.consumer.fetchers 1 The number fetcher threads used to fetch data.
auto.commit.enable true 若是此值设置为true,consumer会周期性的把当前消费的offset值保存到zookeeper。当consumer失败重启以后将会使用此值做为新开始消费的值。
auto.commit.interval.ms 60 * 1000 Consumer提交offset值到zookeeper的周期。
queued.max.message.chunks 2 用来被consumer消费的message chunks 数量, 每一个chunk能够缓存fetch.message.max.bytes大小的数据量。
rebalance.max.retries 4 When a new consumer joins a consumer group the set of consumers attempt to “rebalance” the load to assign partitions to each consumer. If the set of consumers changes while this assignment is taking place the rebalance will fail and retry. This setting controls the maximum number of attempts before giving up.
fetch.min.bytes 1 The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request.
fetch.wait.max.ms 100 The maximum amount of time the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy fetch.min.bytes.
rebalance.backoff.ms 2000 Backoff time between retries during rebalance.
refresh.leader.backoff.ms 200 Backoff time to wait before trying to determine the leader of a partition that has just lost its leader.
auto.offset.reset largest What to do when there is no initial offset in ZooKeeper or if an offset is out of range ;smallest : automatically reset the offset to the smallest offset; largest : automatically reset the offset to the largest offset;anything else: throw exception to the consumer
consumer.timeout.ms -1 若在指定时间内没有消息消费,consumer将会抛出异常。
exclude.internal.topics true Whether messages from internal topics (such as offsets) should be exposed to the consumer.
zookeeper.session.timeout.ms 6000 ZooKeeper session timeout. If the consumer fails to heartbeat to ZooKeeper for this period of time it is considered dead and a rebalance will occur.
zookeeper.connection.timeout.ms 6000 The max time that the client waits while establishing a connection to zookeeper.
zookeeper.sync.time.ms 2000 How far a ZK follower can be behind a ZK leader
相关文章
相关标签/搜索