Apache Kafka 是一款流行的分布式数据流平台,它已经普遍地被诸如 New Relic(数据智能平台)、Uber、Square(移动支付公司)等大型公司用来构建可扩展的、高吞吐量的、且高可靠的实时数据流系统。git
例如,在 New Relic 的生产环境中,Kafka 群集每秒可以处理超过 1500 万条消息,并且其数据聚合率接近 1Tbps。github
可见,Kafka 大幅简化了对于数据流的处理,所以它也得到了众多应用开发人员和数据管理专家的青睐。算法
然而,在大型系统中 Kafka 的应用会比较复杂。若是您的 Consumers 没法跟上数据流的话,各类消息每每在未被查看以前就已经消失掉了。sql
同时,它在自动化数据保留方面的限制,高流量的发布+订阅(publish-subscribe,pub/sub)模式等,可能都会影响到您系统的性能。apache
能够绝不夸张地说,若是那些存放着数据流的系统没法按需扩容、或稳定性不可靠的话,估计您常常会寝食难安。缓存
为了减小上述复杂性,我在此分享 New Relic 公司为 Kafka 集群在应对高吞吐量方面的 20 项最佳实践。网络
我将从以下四个方面进行展开:架构
Kafka 是一种高效的分布式消息系统。在性能上,它具备内置的数据冗余度与弹性,也具备高吞吐能力和可扩展性。并发
在功能上,它支持自动化的数据保存限制,可以以“流”的方式为应用提供数据转换,以及按照“键-值(key-value)”的建模关系“压缩”数据流。负载均衡
要了解各类最佳实践,您须要首先熟悉以下关键术语:
Kafka 中的一条记录或数据单位。每条消息都有一个键和对应的一个值,有时还会有可选的消息头。
Producer 将消息发布到 Kafka 的 topics 上。Producer 决定向 topic 分区的发布方式,如:轮询的随机方法、或基于消息键(key)的分区算法。
Kafka 以分布式系统或集群的方式运行。那么群集中的每一个节点称为一个 Broker。
Topic 是那些被发布的数据记录或消息的一种类别。消费者经过订阅Topic,来读取写给它们的数据。
不一样的 Topic 被分为不一样的分区,而每一条消息都会被分配一个 Offset,一般每一个分区都会被复制至少一到两次。
每一个分区都有一个 Leader 和存放在各个 Follower 上的一到多个副本(即:数据的副本),此法可防止某个 Broker 的失效。
群集中的全部 Broker 均可以做为 Leader 和 Follower,可是一个 Broker 最多只能有一个 Topic Partition 的副本。Leader 可被用来进行全部的读写操做。
单个分区中的每一条消息都被分配一个 Offset,它是一个单调递增的整型数,可用来做为分区中消息的惟一标识符。
Consumer 经过订阅 Topic partition,来读取 Kafka 的各类 Topic 消息。而后,消费类应用处理会收到消息,以完成指定的工做。
Consumer 能够按照 Consumer group 进行逻辑划分。Topic Partition 被均衡地分配给组中的全部 Consumers。
所以,在同一个 Consumer group 中,全部的 Consumer 都以负载均衡的方式运做。
换言之,同一组中的每个 Consumer 都能看到每一条消息。若是某个 Consumer 处于“离线”状态的话,那么该分区将会被分配给同组中的另外一个 Consumer。这就是所谓的“再均衡(rebalance)”。
固然,若是组中的 Consumer 多于分区数,则某些 Consumer 将会处于闲置的状态。
相反,若是组中的 Consumer 少于分区数,则某些 Consumer 会得到来自一个以上分区的消息。
当 Consumer 的速度跟不上消息的产生速度时,Consumer 就会由于没法从分区中读取消息,而产生延迟。
延迟表示为分区头后面的 Offset 数量。从延迟状态(到“追遇上来”)恢复正常所须要的时间,取决于 Consumer 每秒可以应对的消息速度。
其公式以下:time = messages / (consume rate per second - produce rate per second)
此处所谓“分区的数据速率”是指数据的生成速率。换言之,它是由“平均消息大小”乘以“每秒消息数”得出的数据速率决定了在给定时间内,所能保证的数据保存空间的大小(以字节为单位)。
若是您不知道数据速率的话,则没法正确地计算出知足基于给定时间跨度的数据,所须要保存的空间大小。
同时,数据速率也可以标识出单个 Consumer 在不产生延时的状况下,所须要支持的最低性能值。
在您进行大型操做时,各个分区在数据速率上的良莠不齐是很是难以管理的。
其缘由来自于以下三个方面:
有关 Topic Partition 的使用,能够参阅《Kafka Topic Partition的各类有效策略》https://blog.newrelic.com/engineering/effective-strategies-kafka-topic-partitioning/。
在 0.8.x 版中,Consumer 使用 Apache ZooKeeper 来协调 Consumer group,而许多已知的 Bug 会致使其长期处于再均衡状态,或是直接致使再均衡算法的失败(咱们称之为“再均衡风暴”)。
所以在再均衡期间,一个或多个分区会被分配给同一组中的每一个 Consumer。
而在再均衡风暴中,分区的全部权会持续在各个 Consumers 之间流转,这反而阻碍了任何一个 Consumer 去真正获取分区的全部权。
在 Kafka 的 0.10.x 版本中,参数 receive.buffer.bytes 的默认值为 64KB。而在 Kafka 的 0.8.x 版本中,参数 socket.receive.buffer.bytes 的默认值为 100KB。
这两个默认值对于高吞吐量的环境而言都过小了,特别是若是 Broker 和 Consumer 之间的网络带宽延迟积(bandwidth-delay product)大于局域网(local areanetwork,LAN)时。
对于延迟为 1 毫秒或更多的高带宽的网络(如 10Gbps 或更高),请考虑将套接字缓冲区设置为 8 或 16MB。
若是您的内存不足,也至少考虑设置为 1MB。固然,您也能够设置为 -1,它会让底层操做系统根据网络的实际状况,去调整缓冲区的大小。
可是,对于须要启动“热”分区的 Consumers 来讲,自动调整可能不会那么快。
一般,咱们应该保证系统只去处理其能力范围内的数据,而不要超负荷“消费”,进而致使进程中断“挂起”,或出现 Consume group 的溢出。
若是是在 Java 虚拟机(JVM)中运行,Consumers 应当使用固定大小的缓冲区,并且最好是使用堆外内存(off-heap)。请参见 Disruptor 模式:http://lmax-exchange.github.io/disruptor/files/Disruptor-1.0.pdf
固定大小的缓冲区可以阻止 Consumer 将过多的数据拉到堆栈上,以致于 JVM 花费掉其全部的时间去执行垃圾回收,进而没法履行其处理消息的本质工做。
例如,长时间垃圾回收的停滞,可能致使 ZooKeeper 的会话被丢弃、或 Consumer group 处于再均衡状态。
对于 Broker 来讲也如此,若是垃圾回收停滞的时间太长,则会产生集群掉线的风险。
籍此 Producer 可以获知消息是否真正被发送到了 Broker 的分区上。在 Kafka 的 0.10.x 版本上,其设置是 Acks;而在 0.8.x 版本上,则为 request.required.acks。
Kafka 经过复制,来提供容错功能,所以单个节点的故障、或分区 Leader 关系的更改不会影响到系统的可用性。
若是您没有用 Acks 来配置 Producer(或称“fireand forget”)的话,则消息可能会悄然丢失。
其默认值为 3,固然是很是低的。不过,正确的设定值取决于您的应用程序,即:就那些对于数据丢失零容忍的应用而言,请考虑设置为 Integer.MAX_VALUE(有效且最大)。
这样将可以应对 Broker 的 Leader 分区出现没法马上响应 Produce 请求的状况。
特别是 buffer.memory 和 batch.size(以字节为单位)。因为 batch.size 是按照分区设定的,而 Producer 的性能和内存的使用量,均可以与 Topic 中的分区数量相关联。
所以,此处的设定值将取决于以下几个因素:
请记住,将缓冲区调大并不老是好事,若是 Producer 因为某种缘由而失效了(例如,某个 Leader 的响应速度比确认还要慢),那么在堆内内存(on-heap)中的缓冲的数据量越多,其须要回收的垃圾也就越多。
日志压缩(请参见https://kafka.apache.org/documentation/#compaction)须要各个 Broker 上的堆栈(内存)和 CPU 周期都能成功地配合实现而若是让那些失败的日志压缩数据持续增加的话,则会给 Brokers 分区带来风险。
您能够在 Broker 上调整 log.cleaner.dedupe.buffer.size 和 log.cleaner.threads 这两个参数,可是请记住,这两个值都会影响到各个 Brokers 上的堆栈使用。
若是某个 Broker 抛出 OutOfMemoryError 异常,那么它将会被关闭、并可能形成数据的丢失。
而缓冲区的大小和线程的计数,则取决于须要被清除的 Topic Partition 数量、以及这些分区中消息的数据速率与密钥的大小。
对于 Kafka 的 0.10.2.1 版本而言,经过 ERROR 条目来监控日志清理程序的日志文件,是检测其线程可能出现问题的最可靠方法。
请监控发向(transmit,TX)和收向(receive,RX)的流量,以及磁盘的 I/O、磁盘的空间、以及 CPU 的使用率,并且容量规划是维护群集总体性能的关键步骤。
Leader 一般会须要大量的网络 I/O 资源。例如,当咱们将复制因子(replication factor)配置为 三、并运行起来时。
Leader 必须首先获取分区的数据,而后将两套副本发送给另两个 Followers,进而再传输到多个须要该数据的 Consumers 上。
所以在该例子中,单个 Leader 所使用的网络 I/O,至少是 Follower 的四倍。并且,Leader 还可能须要对磁盘进行读操做,而 Follower 只需进行写操做。
这些都是集群中潜在问题的迹象。例如,单个分区频繁出现 ISR 收缩,则暗示着该分区的数据速率超过了 Leader 的能力,已没法为 Consumer 和其余副本线程提供服务了。
详细内容能够参考:https://github.com/apache/kafka/blob/trunk/config/log4j.properties
Kafka 的 Broker 日志记录会耗费大量的磁盘空间,可是咱们却不能彻底关闭它。
由于有时在发生事故以后,须要重建事件序列,那么 Broker 日志就会是咱们最好的、甚至是惟一的方法。
例如,在设定的 x 天内,若是未出现新的消息,您应该考虑该 Topic 是否已经失效,并将其从群集中予以删除。此举可避免您花时间去管理群集中被额外建立的元数据。
咱们应尽量地直接从操做系统的缓存中直接获取分区的数据。然而,这就意味着您必须确保本身的 Consumers 可以跟得上“节奏”,而对于那些延迟的 Consumer 就只能强制 Broker 从磁盘中读取了。
至于如何肯定须要隔离的 Topics,则彻底取决于您本身的业务须要。例如,您有一些使用相同群集的联机事务处理(multipleonline transaction processing,OLTP)系统。
那么将每一个系统的 Topics 隔离到不一样 Brokers 子集中,则可以有助于限制潜在事件的影响半径。
固然,最好仍是要尽可能避免这种状况的发生。
要知道,若是使用复制因子为 1,并在环回接口上对分区所作的测试,是与大多数生产环境大相径庭的。
在环回接口上网络延迟几乎能够被忽略的,而在不涉及到复制的状况下,接收 Leader 确认所需的时间则一样会出现巨大的差别。
欢迎工做一到五年的Java工程师朋友们加入Java架构开发:855801563
群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用本身每一分每一秒的时间来学习提高本身,不要再用"没有时间“来掩饰本身思想上的懒惰!趁年轻,使劲拼,给将来的本身一个交