Kafka快速入门(五)——Kafka管理

Kafka快速入门(五)——Kafka管理

1、Kafka工具脚本简介

一、Kafka工具脚本简介

Kafka默认提供了不少个命令行脚本,用于实现各类各样的功能和运维管理。默认状况下,不加任何参数或携带--help运行Kafka shell脚本根据,会获得脚本的使用方法说明。
connect-standalone.sh用于启动单节点的Standalone模式的Kafka Connect组件。
connect-distributed.sh用于启动多节点的Distributed模式的Kafka Connect组件。
kafka-acls.sh脚本用于设置Kafka权限,好比设置哪些用户能够访问Kafka的哪些TOPIC的权限。
kafka-delegation-tokens.sh用于管理Delegation Token。基于Delegation Token的认证是一种轻量级的认证机制,是对SASL认证机制的补充。
kafka-topics.sh用于管理全部TOPIC。
kafka-console-producer.sh用于生产消息。
kafka-console-consumer.sh用于消费消息。
kafka-producer-perf-test.sh用于生产者性能测试。
kafka-consumer-perf-test.sh用于消费者性能测试。
kafka-delete-records.sh用于删除Kafka的分区消息,因为Kafka有本身的自动消息删除策略,使用率不高。
kafka-dump-log.sh用于查看Kafka消息文件的内容,包括消息的各类元数据信息、消息体数据。
kafka-log-dirs.sh用于查询各个Broker上的各个日志路径的磁盘占用状况。
kafka-mirror-maker.sh用于在Kafka集群间实现数据镜像。
kafka-preferred-replica-election.sh用于执行Preferred Leader选举,能够为指定的主题执行更换Leader的操做。
kafka-reassign-partitions.sh用于执行分区副本迁移以及副本文件路径迁移。
kafka-run-class.sh用于执行任何带main方法的Kafka类。
kafka-server-start.sh用于启动Broker进程。
kafka-server-stop.sh用于中止Broker进程。
kafka-streams-application-reset.sh用于给Kafka Streams应用程序重设位移,以便从新消费数据。
kafka-verifiable-producer.sh用于测试验证生产者的功能。
kafka-verifiable-consumer.sh用于测试验证消费者功能。
trogdor.sh是Kafka的测试框架,用于执行各类基准测试和负载测试。
kafka-broker-api-versions.sh脚本主要用于验证不一样Kafka版本之间服务器和客户端的适配性。前端

二、Kafka API兼容性测试

本文使用Kafka 2.4版本,kafka-broker-api-versions.sh须要指定Kafka Broker,测试相应版本的API兼容性。
kafka-broker-api-versions.sh --bootstrap-server kafka-test:9092
Kafka快速入门(五)——Kafka管理
Produce表示Produce请求,Produce请求是Kafka全部请求类型中的第一号请求,序号是0。0 to 8表示Produce请求在Kafka 2.4中总共有9个版本,序号分别是0到8。usable:8表示当前连入本Kafka Broker的客户端API可以使用的版本号是8,即最新的版本。
当使用低版本的kafka-broker-api-versions.sh脚本链接高版本的Kafka Broker时,Produce请求信息的usable表示低版本的客户端API只能发送版本号是usable值的Produce请求类型。
在Kafka 0.10.2.0版本前,Kafka是单向兼容的,高版本的Broker可以处理低版本Client发送的请求,低版本的Broker不能处理高版本的Client请求。Kafka 0.10.2.0版本开始,Kafka正式支持双向兼容,低版本的Broker也能处理高版本Client的请求。java

三、Kafka性能测试

Java生产者性能测试脚本:
kafka-producer-perf-test.sh --topic test-topic --num-records 10000000 --throughput -1 --record-size 1024 --producer-props bootstrap.servers=kafka-host:port acks=-1 linger.ms=2000 compression.type=lz4
Java消费者性能测试脚本:
kafka-consumer-perf-test.sh --broker-list kafka-host:port --messages 10000000 --topic testnode

2、TOPIC管理

一、建立TOPIC

kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name --partitions 1 --replication-factor 1
--create表示建立TOPIC,--partitions表示TOPIC的分区数量,--replication-factor表示每一个分区下的副本数。从Kafka 2.2版本开始,Kafka社区推荐用--bootstrap-server参数替换--zookeeper参数用于指定Kafka Broker。
Kafka社区推荐使用--bootstrap-server的缘由主要有:
(1)使用--zookeeper会绕过Kafka的安全体系。即便为Kafka集群设置安全认证,限制TOPIC的建立,若是使用--zookeeper的命令,依然能成功建立任意TOPIC,不受认证体系的约束。
(2)使用--bootstrap-server与Kafka集群进行交互,愈来愈成为使用Kafka的标准操做。将来,愈来愈少的命令和API须要与ZooKeeper进行链接。正则表达式

二、查询TOPIC

查询Kafka集群全部TOPIC的命令:
kafka-topics.sh --bootstrap-server broker_host:port --list
查询单个TOPIC详细数据的命令以下:
kafka-topics.sh --bootstrap-server broker_host:port --describe --topic topic_name
若是describe命令不指定具体的TOPIC名称,那么Kafka默认会返回当前用户的全部可见TOPIC的详细数据。shell

三、生产消息

kafka-console-producer.sh --bootstrap-server kafka:9092 --topic test
启动生产者,并将Terminal输入写入TOPIC。apache

四、消费消息

kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test --from-beginning
启动消费者,从TOPIC消费消息数据。json

五、修改TOPIC分区数量

Kafka目前不容许减小某个TOPIC的分区数,所以修改TOPIC的分区数量就是增长分区数量,可使用kafka-topics脚本,结合--alter参数来增长某个TOPIC的分区数。
kafka-topics.sh --bootstrap-server broker_host:port --alter --topic topic_name --partitions 新分区数
指定的分区数必定要比原有分区数大,不然Kafka会抛出InvalidPartitionsException异常。bootstrap

六、修改TOPIC级别参数

设置TOPIC级别参数max.message.bytes。
kafka-configs.sh --zookeeper zookeeper_host:port --entity-type topics --entity-name topic_name --alter --add-config max.message.bytes=10485760
设置常规的TOPIC级别参数使用--zookeeper,设置动态参数使用--bootstrap-server。后端

七、修改TOPIC限速

当某个TOPIC的副本在执行副本同步机制时,为了避免消耗过多的带宽,能够设置Leader副本和Follower副本使用的带宽。设置TOPIC各个分区的Leader副本和Follower副本在处理副本同步时,不得占用超过100MBps(104857600)的带宽,必须先设置Broker端参数leader.replication.throttled.rate 和 follower.replication.throttled.rate,命令以下:
kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.rate=104857600,follower.replication.throttled.rate=104857600' --entity-type brokers --entity-name 0
--entity-name参数用于指定Broker ID。若是TOPIC的副本分别在多个Broker上,须要依次为相应Broker执行。为TOPIC的全部副本都设置限速,能够统一使用通配符*来表示,命令以下:
kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*' --entity-type topics --entity-name testapi

八、删除TOPIC

删除TOPIC命令以下:
kafka-topics.sh --bootstrap-server broker_host:port --delete --topic topic_name
删除TOPIC操做是异步的,执行完删除命令不表明TOPIC当即就被删除,仅仅是被标记成“已删除”状态而已。Kafka会在后台默默地开启TOPIC删除操做,一般须要耐心地等待一段时间。

九、Kafka内置TOPIC

Kafka有两个内TOPIC:__consumer_offsets__transaction_state,内置TOPIC默认都有50个分区。
Kafka 0.11前,当Kafka自动建立__consumer_offsets时,会综合考虑当前运行的Broker台数和Broker端参数offsets.topic.replication.factor值,而后取二者的较小值做为__consumer_offsets的副本数,__consumer_offsets一般在只有一台Broker启动时被建立,所以副本数为1。Kafka 0.11版本后,Kafka会严格遵照offsets.topic.replication.factor值。若是当前运行的Broker数量小于offsets.topic.replication.factor值,Kafka会建立主题失败,并显式抛出异常。
直接查看消费者组提交的位移数据:
kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
直接读取TOPIC消息,查看消费者组的状态信息:
kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$GroupMetadataMessageFormatter" --from-beginning
__transaction_state内置TOPIC的信息查看须要指定kafka.coordinator.transaction.TransactionLog\$TransactionLogMessageFormatter。

十、修改TOPIC副本数

建立内置__consumer_offsets的副本配置json 文件,显式提供50个分区对应的副本数。replicas中的3台Broker排列顺序不一样,目的是将Leader副本均匀地分散在Broker上。文件具体格式以下:

{"version":1, "partitions":[
 {"topic":"__consumer_offsets","partition":0,"replicas":[0,1,2]}, 
  {"topic":"__consumer_offsets","partition":1,"replicas":[0,2,1]},
  {"topic":"__consumer_offsets","partition":2,"replicas":[1,0,2]},
  {"topic":"__consumer_offsets","partition":3,"replicas":[1,2,0]},
  ...
  {"topic":"__consumer_offsets","partition":49,"replicas":[0,1,2]}
]}

执行kafka-reassign-partitions脚本:
kafka-reassign-partitions.sh --zookeeper zookeeper_host:port --reassignment-json-file reassign.json --execute

十一、错误处理

(1)删除TOPIC失败
形成TOPIC删除失败最多见的缘由有两个:副本所在的Broker宕机;待删除TOPIC的部分分区依然在执行迁移过程。
强制删除TOPIC的方法:
A、手动删除ZooKeeper节点/admin/delete_topics下以待删除TOPIC为名的znode。
B、手动删除TOPIC在磁盘上的分区目录。
C、在ZooKeeper中执行rmr /controller,触发Controller重选举,刷新Controller缓存。可能会形成大面积的分区Leader重选举。能够不执行,只是Controller缓存中没有清空待删除TOPIC,不影响使用。
(2)内置__consumer_offsets占用太多磁盘空间
若是__consumer_offsets消耗过多的磁盘空间,须要显式地用jstack 命令查看一下kafka-log-cleaner-thread前缀的线程状态。一般都是由于线程挂掉没法及时清理__consumer_offsets,此时须要重启相应的 Broker。

3、Kafka动态配置

一、Kafka动态配置简介

Kafka安装目录的config/server.properties文件能够用于配置静态参数(Static Configs)。一般会指定server.properties文件的路径来启动Broker。若是要设置Broker端的任何参数,必须在server.properties文件中显式地增长一行对应的配置,启动Broker进程,令参数生效。一般会一次性设置好全部参数后,再启动Broker,但若是须要变动任何参数时,必须重启Broker。
但生产环境中若是每次修改Broker端参数都须要重启Broker,Kafka集群的可用性必然受到限制,所以Kafka 1.1.0版本中正式引入动态Broker参数(Dynamic Broker Configs),即修改参数值后,无需重启Broker就能当即生效。Kafka 2.3版本中Broker端参数有200多个,但Kafka社区并无将每一个参数都升级成动态参数,仅把一部分参数升级为可动态调整。
在Kafka 1.1后官方文档的Broker Configs表中增长了Dynamic Update Mode列,有3类值,分别是read-only、per-broker和cluster-wide。
(1)read-only。read-only类型参数是静态参数,只有重启Broker,才能令修改生效。
(2)per-broker。per-broker类型参数属于动态参数,修改后,只会在对应的Broker上生效。
(3)cluster-wide。cluster-wide类型参数也属于动态参数,修改后,会在整个集群范围内生效,对全部Broker都生效。
经过运行无参数的kafka-configs.sh脚本,其说明文档会列出当前动态Broker参数。

二、Kafka动态参数应用场景

Broker动态参数使用场景很是普遍,包括但不限于如下几种:
(1)动态调整Broker端各类线程池大小,实时应对突发流量。
(2)动态调整Broker端链接信息或安全配置信息。
(3)动态更新SSL Keystore有效期。
(4)动态调整Broker端Compact操做性能。
(5)实时变动JMX指标收集器(JMX Metrics Reporter)。
一般,当Kafka Broker入站流量(inbound data)激增时,会形成Broker端请求积压(Backlog)。此时能够动态增长网络线程数和I/O线程数,快速处理积压请求。当流量洪峰事后,能够将线程数调整回原值,减小对资源的浪费。整个过程不须要重启Broker,而且能够将调整线程数的操做,封装进定时任务中,以实现自动扩缩容。

三、Kafka动态参数保存

Kafka将Broker动态参数保存在ZooKeeper中。
Kafka快速入门(五)——Kafka管理
changes是用来实时监测动态参数变动的,不会保存参数值。
topics是用来保存Kafka TOPIC级别参数的,不属于Broker动态参数,但可以动态变动。
users和 clients则是用于动态调整客户端配额(Quota)的znode节点。所谓配额,是指Kafka运维人员限制连入集群的客户端的吞吐量或者是限定使用的CPU资源。
/config/brokers znode才是保存动态Broker参数的地方。znode有两大类子节点,第一类子节点就只有一个,即< default >,保存cluster-wide范围的动态参数;第二类以broker.id为名,保存特定Broker的per-broker范围参数。

四、Kafka动态参数配置

配置动态Broker参数的工具行命令是Kafka自带的kafka-configs脚本。
设置cluster-wide范围动态参数,须要显式指定--entity-default。
kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-default --alter --add-config unclean.leader.election.enable=true
可使用--describe命令查看cluster-wide范围动态Broker参数设置:
kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-default --describe
设置per-broker范围参数命令以下:
kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-name 1 --alter --add-config unclean.leader.election.enable=false
可使用--describe命令查看per-broker范围动态Broker参数设置:
kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-name 1 --describe
删除cluster-wide范围参数命令以下:
kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-default --alter --delete-config unclean.leader.election.enable
删除per-broker范围参数命令以下:
kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-name 1 --alter --delete-config unclean.leader.election.enable

五、经常使用Broker动态参数

(1)log.retention.ms
修改日志留存时间,能够在Kafka集群层面动态变动日志留存时间。(2)num.io.threads和num.network.threads。
在实际生产环境中,Broker端请求处理能力常常要按需扩容。
(3)SSL相关的参数。
SSL相关的主要4个参数:ssl.keystore.type、ssl.keystore.location、ssl.keystore.password和ssl.key.password。动态实时调整SSL参数,能够建立过时时间很短的SSL证书。每当动态调整SSL参数时,Kafka底层会从新配置Socket链接通道并更新Keystore。
(4)num.replica.fetchers
一般,Follower副本拉取速度比较慢,常见的作法是增长num.replica.fetchers参数值,确保有充足的线程能够执行Follower副本向Leader 副本的拉取。经过调整动态参数,不须要再重启Broker,就能当即在 Follower端生效。

4、Kafka消费者组位移

一、Kafka消费者组位移

传统消息中间件(RabbitMQ 或 ActiveMQ)处理和响应消息的方式是破坏性的(destructive),即一旦消息被成功处理,就会被从Broker上删除。而Kafka是基于日志结构(log-based)的消息引擎,消费者在消费消息时,仅仅从磁盘文件上读取数据,不会删除消息数据,而且位移数据是由消费者控制的,所以可以很容易地修改位移,实现重复消费历史数据的功能。

二、位移策略

位移策略根据位移维度和时间维度分为七种。位移维度是指根据位移值来重设,即直接把消费者的位移值重设成指定的位移值;时间维度是能够给定一个基准时间,让消费者把位移调整成大于基准时间的最小位移。
Kafka快速入门(五)——Kafka管理
Earliest策略表示将位移调整到TOPIC当前最先位移处。最先位移不必定是0,由于在生产环境中,太太久远的消息一般会被Kafka自动删除,因此当前最先位移极可能是一个大于0的值。若是想要从新消费主题的全部消息,那么可使用Earliest策略。
Latest策略表示把位移重设成最新末端位移。若是想跳过全部历史消息,从最新的消息处开始消费,可使用Latest策略。
Current策略表示将位移调整成消费者当前提交的最新位移。
Specified-Offset策略则是比较通用的策略,表示消费者把位移值调整到指定的位移处。如消费者程序在处理某条错误消息时,能够手动地跳过此消息的处理。
Shift-By-N策略表示跳过的位移的相对数值,能够是负数,负数表示把位移重设成当前位移的前N条位移处。
DateTime策略表示将位移重置到指定时间后的最先位移处。如想从新消费昨天的数据,可使用DateTime策略重设位移到昨天0点。
Duration策略表示指定相对的时间间隔,而后将位移调整到距离当前给定时间间隔的位移处,具体格式是 PnDTnHnMnS,是一个符合ISO-8601规范的Duration格式,以字母P开头,后面由4部分组成,即 D、H、M和S,分别表示天、小时、分钟和秒。若是想将位移调回到15分钟前,能够指定PT0H15M0S。

三、重设消费者组位移

重设消费者组位移的方式有两种,一种是经过消费者API来实现,一种是经过kafka-consumer-groups.sh命令行脚原本实现。
(1)消费者API方式
经过Java API重设位移,须要调用KafkaConsumer的seek系列方法。

void seek(TopicPartition partition, long offset);
void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);
void seekToBeginning(Collection<TopicPartition> partitions);
void seekToEnd(Collection<TopicPartition> partitions);

每次调用seek方法只能重设一个分区的位移。OffsetAndMetadata是一个封装Lon 型的位移和自定义元数据的复合类。seekToBeginning和seekToEnd拥有一次重设多个分区的能力。
设置Earliest位移策略的Java代码以下:

Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);

String topic = "test";  // 要重设位移的Kafka主题 
try (final KafkaConsumer<String, String> consumer = 
  new KafkaConsumer<>(consumerProperties)) {
         consumer.subscribe(Collections.singleton(topic));
         consumer.poll(0);
         consumer.seekToBeginning(
  consumer.partitionsFor(topic).stream().map(partitionInfo ->          
  new TopicPartition(topic, partitionInfo.partition()))
  .collect(Collectors.toList()));
}

消费者程序必需要禁止自动提交位移。
组ID要设置成要重设的消费者组的组ID。
调用seekToBeginning方法时,须要一次性构造主题的全部分区对象。
必需要调用带长整型的poll方法,而不要调用consumer.poll(Duration.ofSecond(0))。
设置Latest位移策略的Java代码以下:

consumer.seekToEnd(consumer.partitionsFor(topic).stream().map(partitionInfo->new TopicPartition(topic, partitionInfo.partition())).collect(Collectors.toList()));
设置Current 位移策略的Java代码以下:
consumer.partitionsFor(topic).stream().map(info->new TopicPartition(topic,info.partition())).forEach(tp->{long committedOffset = consumer.committed(tp).offset();consumer.seek(tp, committedOffset);
});

partitionsFor方法获取给定主题的全部分区,而后依次获取对应分区上的已提交位移,最后经过seek方法重设位移到已提交位移处。
(2)命令行脚本方式
Kafka 0.11版本中,引入了经过kafka-consumer-groups.sh脚本重设消费者位移的功能。
Earliest策略直接指定–to-earliest:
kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-earliest –execute
Latest策略直接指定–to-latest:
kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-latest --execute
Current策略直接指定–to-current:
kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-current --execute
Specified-Offset策略直接指定–to-offset:
kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-offset &lt;offset&gt; --execute
Shift-By-N策略直接指定–shift-by N:
kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --shift-by &lt;offset_N&gt; --execute
DateTime策略直接指定–to-datetime:
kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --to-datetime 2020-03-08T12:00:00.000 --execute
Duration策略直接指定–by-duration:
kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --by-duration PT0H60M0S --execute

5、Kafka AdminClient工具

一、AdminClient引入

Kafka管理工具脚本的缺陷以下:
(1)只能运行在控制台,集成度差,不能集成到应用程序、运维框架、监控平台。
(2)经过链接ZooKeeper来提供服务,会绕过Kafka的用户认证机制,不安全。
(3)须要使用Kafka服务器端的代码。
基于上述缺陷,Kafka 0.11版本正式推出Java客户端版的AdminClient。

二、AdminClient功能特性

(1)主题管理:包括主题的建立、删除和查询。
(2)权限管理:包括具体权限的配置与删除。
(3)配置参数管理:包括Kafka各类资源的参数设置、详情查询。Kafka资源包括Broker、主题、用户、Client-id等。
(4)副本日志管理:包括副本底层日志路径的变动和详情查询。
(5)分区管理:即建立额外的主题分区。
(6)消息删除:即删除指定位移前的分区消息。
(7)Delegation Token管理:包括Delegation Token建立、更新、过时和详情查询。
(8)消费者组管理:包括消费者组的查询、位移查询和删除。
(9)Preferred领导者选举:推选指定主题分区的Preferred Broker为领导者。

三、AdminClient工做原理

AdminClient采用双线程设计:前端主线程和后端I/O线程,前端线程负责将用户要执行的操做转换成对应的请求,而后再将请求发送到后端I/O线程的队列中;然后端I/O线程从队列中读取相应的请求,而后发送到对应的Broker节点上,而后把执行结果保存起来,以便等待前端线程的获取。AdminClient内部大量使用生产者-消费者模式将请求生成与处理解耦。
Kafka快速入门(五)——Kafka管理
前端主线程会建立名为Call的请求对象实例,Call主要任务由两个:
(1)构建对应的请求对象。若是要建立主题,建立CreateTopicsRequest;若是查询消费者组位移,建立OffsetFetchRequest。
(2)指定响应的回调逻辑。如从Broker端接收到CreateTopicsResponse后要执行的动做。一旦建立好Call实例,前端主线程会将其放入到新请求队列(New Call Queue)中,此时前端主线程的任务完成,只须要等待结果返回便可。
后端I/O线程使用3个队列来承载不一样时期的请求对象,分别是新请求队列、待发送请求队列和处理中请求队列。新请求队列的线程安全由Java的monitor锁来保证。为了确保前端主线程不会由于monitor锁被阻塞,后端I/O线程会按期地将新请求队列中的全部Call实例所有搬移到待发送请求队列中进行处理。待发送请求队列和处理中请求队列只由后端I/O线程处理,所以无需任何锁机制来保证线程安全。
当I/O线程在处理某个请求时,会显式地将请求保存在处理中请求队列。一旦处理完成,I/O线程会自动地调用Call对象中的回调逻辑完成最后的处理。最后,I/O线程会通知前端主线程结果已经准备完毕,前端主线程可以及时获取到执行操做的结果。AdminClient使用Java Object对象的wait和notify实现的通知机制。
后端I/O线程名字的前缀是kafka-admin-client-thread。若是AdminClient程序貌似在正常工做,但执行的操做没有返回结果或者hang住,多是由于I/O线程出现问题致使的。

四、构造和销毁AdminClient实例

若是正确地引入kafka-clients依赖,那么应该能够在编写Java程序时看到AdminClient对象,完整类路径是org.apache.kafka.clients.admin.AdminClient。建立AdminClient实例须要手动构造一个Properties对象或Map对象,而后传给对应的方法。最多见并且必需要指定的参数是bootstrap.servers参数。

Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-host:port");
props.put("request.timeout.ms", 600000);
AdminClient client = AdminClient.create(props);

AdminClient实例销毁须要显式调用close方法。

五、AdminClient实例

(1)建立Topic

String newTopicName = "test-topic";
try (AdminClient client = AdminClient.create(props)) {
         NewTopic newTopic = new NewTopic(newTopicName, 10, (short) 3);
         CreateTopicsResult result = client.createTopics(Arrays.asList(newTopic));
         result.all().get(10, TimeUnit.SECONDS);
}

(2)消费者组位移查询

String groupID = "test-group";
try (AdminClient client = AdminClient.create(props)) {
         ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);
         Map<TopicPartition, OffsetAndMetadata> offsets = 
                  result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
         System.out.println(offsets);
}

(3)Broker磁盘占用查询

try (AdminClient client = AdminClient.create(props)) {
     DescribeLogDirsResult ret = client.describeLogDirs(Collections.singletonList(targetBrokerId)); // 指定Broker id
     long size = 0L;
     for (Map<String, DescribeLogDirsResponse.LogDirInfo> logDirInfoMap : ret.all().get().values()) {
              size += logDirInfoMap.values().stream().map(logDirInfo -> logDirInfo.replicaInfos).flatMap(
                       topicPartitionReplicaInfoMap ->
                       topicPartitionReplicaInfoMap.values().stream().map(replicaInfo -> replicaInfo.size))
                       .mapToLong(Long::longValue).sum();
     }
     System.out.println(size);
}

AdminClient的describeLogDirs方法获取指定Broker上全部分区主题的日志路径信息。

6、Kafka认证机制

一、Kafka认证机制简介

Kafka 0.9.0.0版本开始,Kafka正式引入认证机制,实现基础的安全用户认证,用于将Kafka上云或进行多租户管理。Kafka 2.3版本支持基于SSL和基于SASL的安全认证机制。
SSL认证主要是指Broker和客户端的双路认证(2-way authentication)。一般,SSL加密(Encryption)已经启用了单向认证,即客户端认证Broker的证书(Certificate)。若是要作SSL认证,须要启用双路认证,即Broker也要认证客户端的证书。SSL一般用于通讯加密,使用SASL来作Kafka认证明现。
Kafka支持经过SASL作客户端认证,SASL是提供认证和数据安全服务的框架。Kafka支持的SASL机制有5种,在不一样版本中被引入,所以须要根据Kafka版本选择其所支持的认证机制。
(1)GSSAPI:基于Kerberos认证机制使用的安全接口,在0.9版本中被引入,用于已经实现Kerberos认证机制的场景。
(2)PLAIN:基于简单的用户名/密码认证的机制,在0.10版本中被引入,与SSL加密搭配使用。PLAIN是一种认证机制,而PLAINTEXT是未使用SSL时的明文传输。PLAIN认证机制的配置和运维成本相对较小,适合于小型公司Kafka集群。但PLAIN认证机制不能动态地增减认证用户,必须重启Kafka集群才能令变动生效。因为全部认证用户信息所有保存在静态文件中,因此只能重启Broker,才能从新加载变动后的静态文件。
(3)SCRAM:主要用于解决PLAIN机制安全问题的新机制,在0.10.2版本中被引入。经过将认证用户信息保存在ZooKeeper的方式,避免了动态修改须要重启Broker的弊端。可使用Kafka提供的命令动态地建立和删除用户,无需重启整个集群。
(4)OAUTHBEARER:基于OAuth 2认证框架的新机制,在2.0版本中被引进。OAuth是一个开发标准,容许用户受权第三方应用访问用户在某网站上的资源,而无需将用户名和密码提供给第三方应用。Kafka不提倡单纯使用OAUTHBEARER,由于其生成的JSON Web Token不安全,必须配以SSL加密才能用在生产环境中。
(5)Delegation Token:用于补充SASL认证机制的轻量级认证机制,在1.1.0版本被引入。若是要使用Delegation Token,须要先配置好SASL 认证,而后再利用Kafka提供的API获取对应的Delegation Token。Broker和客户端在作认证时能够直接使用token,不用每次都去KDC 获取对应的ticket(Kerberos 认证)或传输Keystore文件(SSL认证)。

二、Kafka认证机制比较

Kafka快速入门(五)——Kafka管理

三、SASL/SCRAM-SHA-256 配置实例

(1)建立用户
配置SASL/SCRAM首先须要建立可否链接Kafka集群的用户。建立3个用户,分别是admin用户、producer用户和consumer用户。admin用户用于实现Broker间通讯,producer用于生产消息,consumer用于消费消息。
建立admin:
kafka-configs.sh --zookeeper zookeeper-test:2181 --alter --add-config 'SCRAM-SHA-256=[password=123456],SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name admin
建立producer:
kafka-configs.sh --zookeeper zookeeper-test:2181 --alter --add-config 'SCRAM-SHA-256=[password=123456],SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name producer
建立consumer:
kafka-configs.sh --zookeeper zookeeper-test:2181 --alter --add-config 'SCRAM-SHA-256=[password=123456],SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name consumer
用户查看:
kafka-configs.sh --zookeeper zookeeper-test:2181 --describe --entity-type users --entity-name consumer
(2)建立JAAS文件
配置用户后,须要为每一个Broker建立一个对应的JAAS文件。

KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="123456";
};

配置Broker的server.properties文件:

sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
security.inter.broker.protocol=SASL_PLAINTEXT
listeners=SASL_PLAINTEXT://localhost:9092

(3)启动Broker
KAFKA_OPTS=-Djava.security.auth.login.config=/opt/kafka/config/kafka-broker.jaas kafka-server-start.sh config/server.properties
(4)生产消息
建立Topic
kafka-topics.sh --bootstrap-server kafka-test:9092 --create --topic test --partitions 1 --replication-factor 1
建立Producer客户端配置:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="producer" password="123456";

生产消息:
kafka-console-producer.sh --broker-list kafka-test:9092 --topic test --producer.config producer.conf
(5)消费消息
建立Consumer客户端配置consumer.conf:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="consumer" password="123456";

(6)动态增减用户
删除用户:
kafka-configs.sh --zookeeper zookeeper-test:2181 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name producer
建立用户:
kafka-configs.sh --zookeeper zookeeper:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=123456]' --entity-type users --entity-name test
修改Producer客户端配置:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="test" password="123456";

建立每个客户端,Producer或Consumer都须要指定相应的配置文件。

7、Kafka受权机制

一、Kafka受权机制简介

Kafka 受权机制(Authorization)采用ACL(Access-Control List,访问控制列表)权限模型。Kafka提供了一个可插拔的受权实现机制,会将配置的全部ACL项保存在ZooKeeper下的/kafka-acl节点中。能够经过Kafka自带的kafka-acls.sh脚本动态地对ACL项进行增删改查,并让其当即生效。

二、ACL开启

Kafka开启ACL的方法只须要在Broker端的配置文件中增长一行设置便可,即在server.properties文件中配置authorizer.class.name参数。
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
authorizer.class.name参数指定了ACL受权机制的实现类。Kafka提供了Authorizer接口,容许开发者实现本身的受权机制,但一般是直接使用Kafka自带的SimpleAclAuthorizer实现类。设置authorizer.class.name参数,而且启动Broker后,Broker默认会开启ACL受权验证。

三、超级用户设置

开启ACL受权后,必须显式地为不一样用户设置访问某项资源的权限,不然,在默认状况下,没有配置任何ACL的资源是不能被访问的。但超级用户可以访问全部的资源,即便没有为超级用户设置任何ACL项。超级用户的设置只须要在Broker端的配置文件server.properties中设置super.users参数。
super.users=User:superuser1;User:superuser2
若是须要一次性指定多个超级用户,使用分号做为分隔符。
Kafka支持将全部用户都配置成超级用户的用法。若是在server.properties文件中设置allow.everyone.if.no.acl.found=true,那么全部用户均可以访问没有设置任何ACL的资源。但在生产环境中,特别是对安全有较高要求的环境中,采用白名单机制要比黑名单机制更安全。

四、ACL受权配置

在Kafka中,配置受权的方法是经过kafka-acls.sh脚本。
kafka-acls.sh --authorizer-properties zookeeper.connect=zookeeper-test:2181 --add --allow-principal User:test --operation All --topic '*' --cluster
All表示全部操做,topic中的星号则表示全部主题,指定--cluster则说明要为用户test设置的是集群权限。
kafka-acls.sh --authorizer-properties zookeeper.connect=zookeeper-test:2181 --add --allow-principal User:'*' --allow-host '*' --deny-principal User:BadUser --deny-host 192.168.1.120 --operation Read --topic test
User的星号表示全部用户,allow-host的星号则表示全部IP地址。容许全部的用户使用任意的IP地址读取名为test的TOPIC数据,禁止BadUser用户和192.168.1.120的IP地址访问test下的消息。

五、Kafka ACL权限列表

Kafka提供了细粒度的受权机制,kafka-acls.sh直接指定--producer能同时得到Producer所需权限,指定--consumer能够得到 Consumer所需权限。Kafka受权机制能够不配置认证机制而单独使用,如只为IP地址设置权限。
kafka-acls.sh --authorizer-properties zookeeper.connect=zookeeper-test:2181 --add --deny-principal User:* --deny-host 192.168.1.120 --operation Write --topic test
禁止Producer在192.168.1.120 IP上向test发送数据。

六、云环境多租户权限设置

若是Kafka集群部署在云上,对于多租户须要设置合理的认证机制,并为每一个链接Kafka集群的客户端授予合适权限。
(1)开启ACL
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
(2)开启白名单机制
在Kafka的server.properties文件中,不要设置allow.everyone.if.no.acl.found=true。
(3)为SSL用户受权
使用kafka-acls.sh为SSL用户授予集群的权限。咱们之前面的例子来进行一下说明。在配置 SSL 时,指定用户的 Distinguished Name 为“CN=Xi Hu, OU=YourDept, O=YourCompany, L=Beijing, ST=Beijing, C=CN”。在设置 Broker 端参数时,指定 security.inter.broker.protocol=SSL,即强制指定 Broker 间的通信也采用 SSL 加密。
kafka-acls.sh --authorizer-properties zookeeper.connect=zookeeper-test:2181 --add --allow-principal User:"CN=Xi Hu,OU=YourDept,O=YourCompany,L=Beijing,ST=Beijing,C=CN" --operation All --cluster
(4)为客户端受权
为生产者授予producer权限:
kafka-acls.sh --authorizer-properties zookeeper.connect==zookeeper-test:2181 --add --allow-principal User:"CN=Xi Hu,OU=YourDept,O=YourCompany,L=Beijing,ST=Beijing,C=CN" --producer --topic 'test'
为消费者授予consumer权限:
kafka-acls.sh --authorizer-properties zookeeper.connect=zookeeper-test:2181 --add --allow-principal User:"CN=Xi Hu,OU=YourDept,O=YourCompany,L=Beijing,ST=Beijing,C=CN" --consumer --topic 'test' --group '*'

8、MirrorMaker

一、MirrorMaker简介

一般,大多数业务需求使用一个Kafka集群便可知足,但有些场景确实会须要多个Kafka集群同时工做,好比为了便于实现灾难恢复,能够在两个机房分别部署单独的Kafka集群。若是其中一个机房出现故障,能够很容易地把流量切换到另外一个正常运转的机房。若是要为地理相近的客户提供低延时的消息服务,而主机房离客户很远时,能够在靠近客户的机房部署一套Kafka集群,服务于客户,从而提供低延时的服务。
为了在多个Kafka集群间实现数据同步,Kafka提供了跨集群数据镜像工具MirrorMaker。一般,数据在单个集群下不一样节点之间的拷贝称为备份,而数据在集群间的拷贝称为镜像(Mirroring)。
MirrorMaker本质是一个消费者+生产者的程序。消费者负责从源集群(Source Cluster)消费数据,生产者负责向目标集群(Target Cluster)发送消息。整个镜像流程以下:
Kafka快速入门(五)——Kafka管理
MirrorMaker链接的源集群和目标集群,会实时同步消息。实际场景中,用户会部署多套Kafka集群,用于实现不一样的目的。
Kafka快速入门(五)——Kafka管理
源集群负责主要的业务处理,目标集群1能够用于执行数据分析,目标集群2则充当源集群的热备份。

二、MirrorMaker脚本工具

Kafka默认提供MirrorMaker命令行工具kafka-mirror-maker脚本,其常见用法是指定生产者配置文件、消费者配置文件、线程数以及要执行数据镜像的TOPIC正则表达式。
kafka-mirror-maker.sh --consumer.config config/consumer.properties --producer.config config/producer.properties --num.streams 8 --whitelist ".*"
--consumer.config:指定MirrorMaker中消费者的配置文件地址,最主要的配置项是bootstrap.servers,即MirrorMaker从哪一个Kafka集群读取消息。因为MirrorMaker有可能在内部建立多个消费者实例并使用消费者组机制,所以还须要设置group.id参数。另外,须要配置auto.offset.reset=earliest,不然MirrorMaker只会拷贝在其启动后到达源集群的消息。
--producer.config:指定MirrorMaker内部生产者组件的配置文件地址。必须显式地指定参数bootstrap.servers,配置拷贝的消息要发送到的目标集群。
--num.streams:MirrorMaker要建立多少个KafkaConsumer实例,会在后台建立并启动多个线程,每一个线程维护专属的消费者实例。
--whitelist:接收一个正则表达式。全部匹配该正则表达式的TOPIC都会被自动地执行镜像。“.*”表示要同步源集群上的全部TOPIC。

三、跨集群数据镜像实例

(1)配置生产者和消费者
生产者配置文件producer.config:

producer.properties:
bootstrap.servers=kafka-test1:9092

消费者配置文件consumer.config:

consumer.properties:
bootstrap.servers=kafka-test2:9092
group.id=mirrormaker
auto.offset.reset=earliest

(2)MirrorMaker工具启动
kafka-mirror-maker.sh --producer.config producer.config --consumer.config consumer.config --num.streams 4 --whitelist ".*"
若是MirrorMaker内部消费者会使用轮询策略(Round-robin)来为消费者实例分配分区,须要consumer.properties文件中增长配置:
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
(3)结果验证
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka-test:9092 --topic test --time -2
-1和-2分别表示获取某分区最新的位移和最先的位移,两个位移值的差值就是分区当前的消息数。
MirrorMaker在执行消息镜像的过程当中,若是发现要同步的Topic在目标集群上不存在,会根据目标集群的Kafka Broker端参数num.partitions和default.replication.factor默认值,自动将Topic建立出来。在生产环境中,推荐提早把要同步的全部主题按照源集群上的规格在目标集群上地建立出来,避免在源集群某个分区的消息同步到目标集群后位于其它分区中。MirrorMaker默认会同步内置Topic,在镜像位移主题时,若是发现目标集群还没有建立位移主题,会根据Broker端参数offsets.topic.num.partitions和offsets.topic.replication.factor来建立位移Topic,默认配置是50个分区,每一个分区3个副本。在Kafka 0.11.0.0版本前,Kafka不会严格依照offsets.topic.replication.factor参数值,即若是设置offsets.topic.replication.factor参数值为3,而当前存活的Broker数量少于3,位移主题依然能被成功建立,副本数会取offsets.topic.replication.factor参数值和存活Broker数之间的较小值。从Kafka 0.11.0.0版本开始,Kafka会严格遵照设定offsets.topic.replication.factor参数值,若是发现存活Broker数量小于参数值,就会直接抛出异常,通知主题建立失败。

四、其它跨集群数据镜像工具

(1)uReplicatorUber公司在使用MirrorMaker过程当中发现了一些缺陷,好比MirrorMaker中的消费者使用的是消费者组机制,会不可避免地会碰到不少Rebalance问题。所以,Uber研发了uReplicator,使用Apache Helix做为集中式的TOPIC分区管理组件来管理分区的分配,而且重写消费者程序,替代MirrorMaker下的消费者,从而避免Rebalance各类问题。(2)Brooklin Mirror Maker针对MirrorMaker工具不易实现管道化的缺陷,LinkedIn进行针对性的改进并对性能进行优化研发了Brooklin Mirror Maker。(3)Confluent Replicator Replicator是Confluent提供的企业级的跨集群镜像方案,能够便捷地提供Kafka TOPIC在不一样集群间的迁移,同时还能自动在目标集群上建立与源集群上配置如出一辙的TOPIC,极大地方便运维管理。