kafka_0.10.1.0监控及管理

kafka_0.10.1.0监控及管理

1. kafka监控

kafka自身没有监控管理页面,不管是进行一些管理操做仍是状态的监控都要命令加一大堆记不住的参数,实在是很不方便,不过好在在github上开源了一些工具,在kafka的生态系统中也有说起到:java

  • Kafka Manager: 都是以表格的形式展示数据,比较方便用来管理kafka,例如topic的建立、删除以及分区的管理等。
  • Kafka Offset Monitor: 监控消费者以及所在分区的offset,帮助分析当前的消费以及生产是否顺畅,功能比较单调但界面还能够。
  • Kafka Web Console:github上已经说明了再也不更新了,且建议使用Kafka manager
  • kafkat: 一个简化了的命令行工具,用来管理kafka的broker,partition,topic.
  • Capillary: 若是是kafka+storm集成使用,能够选择该工具,是一个web应用

除了前面两个,其它几个都没试用过,就算在网上查也是推荐前两个而已,kafka manager基于jmx功能比较强大,利用它作管理方面;而KafkaOffsetMonitor从它的启动参数来看应该是定时从zookeeper上获取消费者的offset,以图的形式展现,比较直观(对于一些实现Exactly once的系统,offset并不保存在zookeeper上面,它将不能使用),二者结合使用,相得益彰。linux

2.1. Kafka-manager

kafka manager的源码:https://github.com/yahoo/kafka-manager 
官方要求的kafka版本:Kafka 0.8.1.1 或者 0.8.2.x 或者 0.9.0.x,不过使用kafka_0.10.1.0时也能正常。 
java版本要求:Java 8+
git

2.1.1 编译源码

为了获得部署包kafka-manager-xxxx.zip,先根据上面的地址下载源码再编译(不想这么麻烦的话,能够去一些kafka的qq群,通常群共享里都会有这个包)。kafka-manager工程是利用SBT进行构建的,因此编译以前还须要安装SBT,安装Java 8。最后执行命令编译:github

sbt clean dist
  • 1

网络很差的话可能须要重复编译,成功后在target/universal目录下能够看到kafka-manager-1.3.2.1.zipweb

2.1.2 修改配置

把编译获得的zip包解压,在conf目录中有一个application.conf文件,最小化的配置只须要在该文件中修改kafka-manager.zkhosts参数便可:apache

kafka-manager.zkhosts="master:2181,slave1:2181,slave2:2181"
  • 1

2.1.3 kafka启动jmx

kafka服务必须要开启JMX,不然在下一步启动kafka-manager时会出现: 
java.lang.IllegalArgumentException: requirement failed: No jmx port but jmx polling enabled! 
启动kafka服务时指定JMX_PORT值:
json

JMX_PORT=9999 bin/kafka-server-start.sh -daemon config/server.properties
  • 1

或者修改kafka-server-start.sh,在前面加上:bootstrap

export JMX_PORT=9999
  • 1

2.1.4 启动kafka-manager

以运行在linux上为例,启动脚本为bin/kafkak-manager,该脚本会默认占用9000端口,也能够经过参数修改端口以及指定java版本运行:api

nohup bin/kafka-manager -java-home /usr/java/jdk1.8.0_101/ -Dhttp.port=8081 &
  • 1

启动成功后,便可以经过http://ip:8081来访问网络

2.2. KafkaOffsetMonitor

源码地址:https://github.com/quantifind/KafkaOffsetMonitor 
jar包下载:https://github.com/quantifind/KafkaOffsetMonitor/releases/tag/v0.2.1 
KafkaOffsetMonitor使用比较方便,将会被打成一个jar包,直接运行便可,做者已经把打好的包上传到github上面,执行如下命令启动:

java -cp KafkaOffsetMonitor-assembly-0.2.1.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk master,slave1,slave2:2181 --port 8082 --refresh 10.seconds --retain 2.days
  • 1

启动成功后,便可以经过http://ip:8082来访问

3. kafka自带脚本工具

基本全部脚本都是调用kafka-run-class.sh脚本去执行一个命令模式的class.建议使用脚本时参考脚本的使用说明。

3.1. 主题管理:kafka-topics.sh

建立、删除、查看或者改变一个topic.

3.1.1. 建立topic

bin/kafka-topics.sh --zookeeper master:2181 --create --topic test --partitions 3 --replication-factor 2 --config flush.ms=1000 --config flush.messages=1
  • 1

建立一个名称为test的topic,它有3个分区,每一个分区两个replica,经过–config给topic设置属性,格式为key=value,若是有多个配置属性则如上命令。这种建立方式kafka会自动把各个分区的replica分配到相应的broker,也能够在建立topic时手动指定哪一个分区的哪一个replica落在指定的broker,示例命令以下:

bin/kafka-topics.sh --zookeeper master:2181 --create --topic test --config flush.ms=1000 --config flush.messages=1 --replica-assignment 0:1,1:2
  • 1

关键的配置参数为–replica-assignment,该参数不能与–partitions和–replication-factor同时出现,参数的使用格式以下:

broker_id_for_part0_replica1: broker_id_for_part0_replica2, 
broker_id_for_part1_replica1: broker_id_for_part1_replica2, 
broker_id_for_part2_replica1: broker_id_for_part2_replica2

–replica-assignment 0:1,1:2表示有两个分区,分区0的replica1在broker.id=0的kafka服务上,分区0的replica2在broker.id=1的kafka服务上;分区1的replica1在broker.id=1的kafka服务上,分区1的replica2在broker.id=2的kafka服务上。

3.1.2. 删除topic

当使用delete命令删除topic,默认只是进行标记,并无真正的删除

这里写图片描述

Note: This will have no impact if delete.topic.enable is not set to true. 
须要在config/server.properties配置文件中开启delete.topic.enable=true

3.1.3. 查看topic

bin/kafka-topics.sh --zookeeper master:2181 --describe --topic test
  • 1

describe名称为test的topic,将会显示topic的分区数、replica因子、配置参数以及各分区的分布状况(leader,replica,isr),以下图:

这里写图片描述 
使用–describe时还能够结合其它一些参数对结果进行过滤:

  • topics-with-overrides:加上该参数时,只显示config被覆盖过的topic(例如使用下面的–alter修改config,或者建立topic时指定–config也算)。
  • unavailable-partitions:加上该参数时,只显示leader不可用的分区。
  • under-replicated-partitions:加上该参数时,只显示replica不足的分区。

3.1.4. 修改topic

根据–alter参数的说明,能够修改topic的分区数(目前只能是增长),修改配置config,以及修改replica(这里貌似不许确,根据官网的文档说明,若是想要增长replication factor,应该使用kafka-reassign-partitions.sh脚本)。

  • 增长分区:从2个分区增长到3个
bin/kafka-topics.sh --zookeeper master:2181 --alter --topic test --partitions 3
  • 1

成功后describe一下topic: 
这里写图片描述

添加分区不能改变现有的数据

  • 添加修改配置 
    根据上图test主题的Configs:flush.messages=1,flush.ms=1000,尝试把flush.ms修改成2000,命令以下:
bin/kafka-topics.sh --zookeeper master:2181 --alter --topic test --config flush.ms=2000
  • 1

成功后: 
这里写图片描述

  • 删除配置 
    –alter结合–delete-config一块儿使用能够删除某项配置,尝试删除flush.ms配置项目:
bin/kafka-topics.sh --zookeeper master:2181 --alter --topic test --delete-config flush.ms
  • 1

成功后: 
这里写图片描述

PS:对于使用–alter增长、修改和删除config,从0.9.0.0版本后建议使用kafka-configs.sh脚本。

3.2. 配置管理:kafka-configs.sh

这个脚本专门是用来添加,修改和删除实体的config的,其中操做的实体对象有:topic, client, user 和 broker。

3.2.1. 更新配置

添加和修改均可以统一说成更新,没有则添加,存在即修改。结合alter,add-config以及其它一些配置,例如修改broker的某个config:

bin/kafka-configs.sh --zookeeper master:2181 --entity-type brokers --entity-name 0 --alter --add-config 'leader.replication.throttled.rate=700000000'
  • 1
  • entity-type:实体类型,必须为(topics/clients/users/brokers)其中之一。
  • entity-name:实体名称(topic的名称,client的id,user的principal名称,broker的id)。
  • add-config:格式为’ key1:value1,key2:[v21,v22],key3:value3’,多个配置能够写在一块儿,比kafka-topic.sh的修改更人性化。

3.2.2. 查看配置

执行上面命令给id=0的broker添加配置leader.replication.throttled.rate后,接着查看一下该broker的config:

bin/kafka-configs.sh --zookeeper master:2181 --entity-type brokers --entity-name 0 --describe
  • 1

结果: 
这里写图片描述

3.2.3. 删除配置

接上,删除id=0的broker的配置leader.replication.throttled.rate

bin/kafka-configs.sh --zookeeper master:2181 --entity-type brokers --entity-name 0 --alter --delete-config 'leader.replication.throttled.rate'
  • 1

结果: 
这里写图片描述

3.3. 消费者组管理:kafka-consumer-groups.sh

能够列出全部消费者组,查看某个消费者组的详细状况以及删除消费者组的信息(删除只适用于旧版本基于zookeeper的消费都组)。

3.3.1. 列出消费者组

Kafka默认一直会有一个“KafkaManagerOffsetCache”的消费者组,为了更加直观,先用kafka-console-consumer.sh启动一个消费都,并加入一个叫作“test_group”的组:

bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic test --consumer-property group.id=test_group
  • 1

接着使用如下命令列出全部的消费都组:

bin/kafka-consumer-groups.sh --bootstrap-server master:9092 --list
  • 1

已经能够看到“test_group”的消费都组了: 
这里写图片描述

3.3.2. 查看消费者组

查看消费者组的具体消费情况,结合来分析目前集群的稳健程度,执行如下命令:

bin/kafka-consumer-groups.sh --bootstrap-server master:9092 --describe --group test_group
  • 1

结果: 
这里写图片描述

3.4. 均衡leader:kafka-preferred-replica-election.sh

每一个分区的全部replicas叫作”assigned replicas”,”assigned replicas”中的第一个replicas叫”preferred replica”,刚建立的topic通常”preferred replica”(优先副本)是leader。 
各分区的读取写请求都是由leader来接收处理的,那么固然但愿各分区的leader能够均衡地分布在各个broker之上,作到均衡负载,提升集群稳定性以及充分利用资源。通常在建立topic时,kafka都会默认把leader平均分配,但当某个broker宕掉后,会致使该broker上的leader转移到其它的broker上去,致使机群的负载不均衡,就算宕掉的broker恢复正常,它上面已经没有leader。可使用kafka-preferred-replica-election.sh工具令到恢复后的broker上的优先副本从新选举成为leader,这样又恢复到了宕掉以前的状态。

下面来模拟一下整个过程,首先建立一个topic,3个分区,每一个分区的leader分别在3个broker上面: 
这里写图片描述

分区0的leader已经从broker0转移到了broker1了,在Isr中也看不到本来broker0的两个replica。最后从新启动broker0并执行如下命令:

bin/kafka-preferred-replica-election.sh --zookeeper master:2181
  • 1

再观察test的分区状况: 
这里写图片描述

能够看到test已经恢复到最初的leader分布状况了。默认是对全部分区进行优先副本选举(preferred replica election),若是想指定操做某些分区,则须要配合–path-to-json-file参数,例如test有0,1,2三个分区,只想操做1,2分区,首先编译test_election.json文件,内容以下:

{“partitions”:[{“topic”: “test”, “partition”: 1}, {“topic”: “test”, 
“partition”: 2}]}

而后执行如下命令:

bin/kafka-preferred-replica-election.sh --zookeeper master:2181 --path-to-json-file test_election.json
  • 1

PS:其实能够配置kafka自动平衡leader的,在server.properties文件中设置:auto.leader.rebalance.enable=true便可,而该配置默认是已经打开的,想验证的话能够重启一个broker,隔一段时间后会发现leader会自动恢复。

3.5. 扩容重分区:kafka-reassign-partitions.sh

当有新的broker节点加入到已经在使用的集群,kafka是不会自动均衡本来的数据到新节点的,须要手动对分区进行迁移,使得新节点能够对外提供服务。(对于后来建立和topic则不须要)。

3.5.1. 生成迁移计划

首先确定要知道须要对哪些topic进行迁移,且明确哪一个分区须要迁移到哪一个broker节点。现有一个分区test,具体以下图: 
这里写图片描述

手动编辑一个json文件(例如命名为topics-to-move.json),表示哪些topic是须要迁移的,内容以下(能够指定多个topics):

{“topics”: [{“topic”: “test”}], “version”:1 }

想要把test的全部分区迁移到broker1,2,执行如下命令生成迁移计划:

bin/kafka-reassign-partitions.sh --zookeeper master:2181 --topics-to-move-json-file topics-to-move.json --broker-list "1,2" --generate
  • 1
  • topics-to-move-json-file:上面建立的topics-to-move.js文件
  • broker-list:分区须要迁移到的broker,格式如命令
  • generate:表示生成分配规则,内容是json串 
    执行结果以下: 
    这里写图片描述

显示了当前的分配规则(能够用做回滚)以及新生成的分配规则,把内容保存到文件(expand-cluster-reassignment.json),固然,也能够手动修改里面的内容,只要符合格式便可:

{“version”:1,”partitions”:[{“topic”:”test”,”partition”:1,”replicas”:[2,1]},{“topic”:”test”,”partition”:2,”replicas”:[1,2]},{“topic”:”test”,”partition”:0,”replicas”:[1,2]}]}

3.5.2. 执行迁移计划

根据上一步生成的分配规则expand-cluster-reassignment.json启动迁移,执行如下命令:

bin/kafka-reassign-partitions.sh --zookeeper master:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
  • 1

而后describe一下,查看新的分区分配状况: 
这里写图片描述

能够看到如今全部分区的replica都已经所有迁移到了broker1,2上面。

3.5.3. 验证迁移计划

仍是根据分配规则expand-cluster-reassignment.json验证分区是否分配成功,执行如下命令:

bin/kafka-reassign-partitions.sh --zookeeper master:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
  • 1

执行结果以下: 
这里写图片描述

至此,分区的迁移已经完成。其实已经对分区规则熟悉的话,能够跳过生成迁移计划这步,直接编写expand-cluster-reassignment.json,而后执行验证。

3.6. 增长副本:kafka-reassign-partitions.sh

为分区增长副本,仍是使用kafka-reassign-partitions.sh命令,而后编辑副本规则json文件便可。现有如下topic: 
这里写图片描述

分区0有两个replica,分别在broker1,2上,如今准备在broker0上添加一个replica,先建立副本分配json文件(increase-replication-factor.json),内容以下:

{“version”:1, 
“partitions”:[{“topic”:”test”,”partition”:0,”replicas”:[0,1,2]}]}

而后指定increase-replication-factor.json执行下面的命令:

bin/kafka-reassign-partitions.sh --zookeeper master:2181 --reassignment-json-file increase-replication-factor.json --execute
  • 1

接着,一样使用increase-replication-factor.json来验证是否成功:

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify
  • 1

执行结果以下: 
这里写图片描述

或者describe一下topic: 
这里写图片描述

相关文章
相关标签/搜索