kafka自身没有监控管理页面,不管是进行一些管理操做仍是状态的监控都要命令加一大堆记不住的参数,实在是很不方便,不过好在在github上开源了一些工具,在kafka的生态系统中也有说起到:java
除了前面两个,其它几个都没试用过,就算在网上查也是推荐前两个而已,kafka manager基于jmx功能比较强大,利用它作管理方面;而KafkaOffsetMonitor从它的启动参数来看应该是定时从zookeeper上获取消费者的offset,以图的形式展现,比较直观(对于一些实现Exactly once的系统,offset并不保存在zookeeper上面,它将不能使用),二者结合使用,相得益彰。linux
kafka manager的源码:https://github.com/yahoo/kafka-manager
官方要求的kafka版本:Kafka 0.8.1.1 或者 0.8.2.x 或者 0.9.0.x,不过使用kafka_0.10.1.0时也能正常。
java版本要求:Java 8+git
为了获得部署包kafka-manager-xxxx.zip,先根据上面的地址下载源码再编译(不想这么麻烦的话,能够去一些kafka的qq群,通常群共享里都会有这个包)。kafka-manager工程是利用SBT进行构建的,因此编译以前还须要安装SBT,安装Java 8。最后执行命令编译:github
sbt clean dist
网络很差的话可能须要重复编译,成功后在target/universal目录下能够看到kafka-manager-1.3.2.1.zipweb
把编译获得的zip包解压,在conf目录中有一个application.conf文件,最小化的配置只须要在该文件中修改kafka-manager.zkhosts
参数便可:apache
kafka-manager.zkhosts="master:2181,slave1:2181,slave2:2181"
kafka服务必须要开启JMX,不然在下一步启动kafka-manager时会出现:
java.lang.IllegalArgumentException: requirement failed: No jmx port but jmx polling enabled!
启动kafka服务时指定JMX_PORT值:json
JMX_PORT=9999 bin/kafka-server-start.sh -daemon config/server.properties
或者修改kafka-server-start.sh,在前面加上:bootstrap
export JMX_PORT=9999
以运行在linux上为例,启动脚本为bin/kafkak-manager,该脚本会默认占用9000端口,也能够经过参数修改端口以及指定java版本运行:api
nohup bin/kafka-manager -java-home /usr/java/jdk1.8.0_101/ -Dhttp.port=8081 &
启动成功后,便可以经过http://ip:8081来访问网络
源码地址:https://github.com/quantifind/KafkaOffsetMonitor
jar包下载:https://github.com/quantifind/KafkaOffsetMonitor/releases/tag/v0.2.1
KafkaOffsetMonitor使用比较方便,将会被打成一个jar包,直接运行便可,做者已经把打好的包上传到github上面,执行如下命令启动:
java -cp KafkaOffsetMonitor-assembly-0.2.1.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk master,slave1,slave2:2181 --port 8082 --refresh 10.seconds --retain 2.days
启动成功后,便可以经过http://ip:8082来访问
基本全部脚本都是调用kafka-run-class.sh脚本去执行一个命令模式的class.建议使用脚本时参考脚本的使用说明。
建立、删除、查看或者改变一个topic.
bin/kafka-topics.sh --zookeeper master:2181 --create --topic test --partitions 3 --replication-factor 2 --config flush.ms=1000 --config flush.messages=1
建立一个名称为test的topic,它有3个分区,每一个分区两个replica,经过–config给topic设置属性,格式为key=value,若是有多个配置属性则如上命令。这种建立方式kafka会自动把各个分区的replica分配到相应的broker,也能够在建立topic时手动指定哪一个分区的哪一个replica落在指定的broker,示例命令以下:
bin/kafka-topics.sh --zookeeper master:2181 --create --topic test --config flush.ms=1000 --config flush.messages=1 --replica-assignment 0:1,1:2
关键的配置参数为–replica-assignment,该参数不能与–partitions和–replication-factor同时出现,参数的使用格式以下:
broker_id_for_part0_replica1: broker_id_for_part0_replica2,
broker_id_for_part1_replica1: broker_id_for_part1_replica2,
broker_id_for_part2_replica1: broker_id_for_part2_replica2
–replica-assignment 0:1,1:2表示有两个分区,分区0的replica1在broker.id=0的kafka服务上,分区0的replica2在broker.id=1的kafka服务上;分区1的replica1在broker.id=1的kafka服务上,分区1的replica2在broker.id=2的kafka服务上。
当使用delete命令删除topic,默认只是进行标记,并无真正的删除
Note: This will have no impact if delete.topic.enable is not set to true.
须要在config/server.properties配置文件中开启delete.topic.enable=true
bin/kafka-topics.sh --zookeeper master:2181 --describe --topic test
describe名称为test的topic,将会显示topic的分区数、replica因子、配置参数以及各分区的分布状况(leader,replica,isr),以下图:
使用–describe时还能够结合其它一些参数对结果进行过滤:
根据–alter参数的说明,能够修改topic的分区数(目前只能是增长),修改配置config,以及修改replica(这里貌似不许确,根据官网的文档说明,若是想要增长replication factor,应该使用kafka-reassign-partitions.sh脚本)。
bin/kafka-topics.sh --zookeeper master:2181 --alter --topic test --partitions 3
成功后describe一下topic:
添加分区不能改变现有的数据
bin/kafka-topics.sh --zookeeper master:2181 --alter --topic test --config flush.ms=2000
成功后:
bin/kafka-topics.sh --zookeeper master:2181 --alter --topic test --delete-config flush.ms
成功后:
PS:对于使用–alter增长、修改和删除config,从0.9.0.0版本后建议使用kafka-configs.sh脚本。
这个脚本专门是用来添加,修改和删除实体的config的,其中操做的实体对象有:topic, client, user 和 broker。
添加和修改均可以统一说成更新,没有则添加,存在即修改。结合alter,add-config以及其它一些配置,例如修改broker的某个config:
bin/kafka-configs.sh --zookeeper master:2181 --entity-type brokers --entity-name 0 --alter --add-config 'leader.replication.throttled.rate=700000000'
执行上面命令给id=0的broker添加配置leader.replication.throttled.rate后,接着查看一下该broker的config:
bin/kafka-configs.sh --zookeeper master:2181 --entity-type brokers --entity-name 0 --describe
结果:
接上,删除id=0的broker的配置leader.replication.throttled.rate
bin/kafka-configs.sh --zookeeper master:2181 --entity-type brokers --entity-name 0 --alter --delete-config 'leader.replication.throttled.rate'
结果:
能够列出全部消费者组,查看某个消费者组的详细状况以及删除消费者组的信息(删除只适用于旧版本基于zookeeper的消费都组)。
Kafka默认一直会有一个“KafkaManagerOffsetCache”的消费者组,为了更加直观,先用kafka-console-consumer.sh启动一个消费都,并加入一个叫作“test_group”的组:
bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic test --consumer-property group.id=test_group
接着使用如下命令列出全部的消费都组:
bin/kafka-consumer-groups.sh --bootstrap-server master:9092 --list
已经能够看到“test_group”的消费都组了:
查看消费者组的具体消费情况,结合来分析目前集群的稳健程度,执行如下命令:
bin/kafka-consumer-groups.sh --bootstrap-server master:9092 --describe --group test_group
结果:
每一个分区的全部replicas叫作”assigned replicas”,”assigned replicas”中的第一个replicas叫”preferred replica”,刚建立的topic通常”preferred replica”(优先副本)是leader。
各分区的读取写请求都是由leader来接收处理的,那么固然但愿各分区的leader能够均衡地分布在各个broker之上,作到均衡负载,提升集群稳定性以及充分利用资源。通常在建立topic时,kafka都会默认把leader平均分配,但当某个broker宕掉后,会致使该broker上的leader转移到其它的broker上去,致使机群的负载不均衡,就算宕掉的broker恢复正常,它上面已经没有leader。可使用kafka-preferred-replica-election.sh工具令到恢复后的broker上的优先副本从新选举成为leader,这样又恢复到了宕掉以前的状态。
下面来模拟一下整个过程,首先建立一个topic,3个分区,每一个分区的leader分别在3个broker上面:
分区0的leader已经从broker0转移到了broker1了,在Isr中也看不到本来broker0的两个replica。最后从新启动broker0并执行如下命令:
bin/kafka-preferred-replica-election.sh --zookeeper master:2181
再观察test的分区状况:
能够看到test已经恢复到最初的leader分布状况了。默认是对全部分区进行优先副本选举(preferred replica election),若是想指定操做某些分区,则须要配合–path-to-json-file参数,例如test有0,1,2三个分区,只想操做1,2分区,首先编译test_election.json文件,内容以下:
{“partitions”:[{“topic”: “test”, “partition”: 1}, {“topic”: “test”,
“partition”: 2}]}
而后执行如下命令:
bin/kafka-preferred-replica-election.sh --zookeeper master:2181 --path-to-json-file test_election.json
PS:其实能够配置kafka自动平衡leader的,在server.properties文件中设置:auto.leader.rebalance.enable=true便可,而该配置默认是已经打开的,想验证的话能够重启一个broker,隔一段时间后会发现leader会自动恢复。
当有新的broker节点加入到已经在使用的集群,kafka是不会自动均衡本来的数据到新节点的,须要手动对分区进行迁移,使得新节点能够对外提供服务。(对于后来建立和topic则不须要)。
首先确定要知道须要对哪些topic进行迁移,且明确哪一个分区须要迁移到哪一个broker节点。现有一个分区test,具体以下图:
手动编辑一个json文件(例如命名为topics-to-move.json),表示哪些topic是须要迁移的,内容以下(能够指定多个topics):
{“topics”: [{“topic”: “test”}], “version”:1 }
想要把test的全部分区迁移到broker1,2,执行如下命令生成迁移计划:
bin/kafka-reassign-partitions.sh --zookeeper master:2181 --topics-to-move-json-file topics-to-move.json --broker-list "1,2" --generate
显示了当前的分配规则(能够用做回滚)以及新生成的分配规则,把内容保存到文件(expand-cluster-reassignment.json),固然,也能够手动修改里面的内容,只要符合格式便可:
{“version”:1,”partitions”:[{“topic”:”test”,”partition”:1,”replicas”:[2,1]},{“topic”:”test”,”partition”:2,”replicas”:[1,2]},{“topic”:”test”,”partition”:0,”replicas”:[1,2]}]}
根据上一步生成的分配规则expand-cluster-reassignment.json启动迁移,执行如下命令:
bin/kafka-reassign-partitions.sh --zookeeper master:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
而后describe一下,查看新的分区分配状况:
能够看到如今全部分区的replica都已经所有迁移到了broker1,2上面。
仍是根据分配规则expand-cluster-reassignment.json验证分区是否分配成功,执行如下命令:
bin/kafka-reassign-partitions.sh --zookeeper master:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
执行结果以下:
至此,分区的迁移已经完成。其实已经对分区规则熟悉的话,能够跳过生成迁移计划这步,直接编写expand-cluster-reassignment.json,而后执行验证。
为分区增长副本,仍是使用kafka-reassign-partitions.sh命令,而后编辑副本规则json文件便可。现有如下topic:
分区0有两个replica,分别在broker1,2上,如今准备在broker0上添加一个replica,先建立副本分配json文件(increase-replication-factor.json),内容以下:
{“version”:1,
“partitions”:[{“topic”:”test”,”partition”:0,”replicas”:[0,1,2]}]}
而后指定increase-replication-factor.json执行下面的命令:
bin/kafka-reassign-partitions.sh --zookeeper master:2181 --reassignment-json-file increase-replication-factor.json --execute
接着,一样使用increase-replication-factor.json来验证是否成功:
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify
执行结果以下:
或者describe一下topic: