使用kafka-topic.sh工具能够执行大部分操做 建立/修改/删除/查看集群里的主题。要使用所有功能,须要经过--zookeeper参数提供zookeerper链接字符串json
建立主题:bootstrap
建立主题须要3个参数: 主题名字 复制系数 分区数量ide
格式: kafka-topic.sh --zookeeper <zookeeper connect> --create --topic <string> --replication-factor <integer> --partitions <integer>工具
若是不须要基于机架信息的分配策略,使用参数--disable-rack-aware3d
增长主题分区的数量至16:日志
kafka-topic.sh --zookeeper <zookeeper connect> --alter --topic my-topic --partition 16server
减小主题分区数量: 会致使消息乱序,只能删除分区数量,从新建立blog
删除主题:索引
配置参数 delete.topic.enable=true 字符串
kafka-topic.sh --zookeeper <zookeeper connect> --delete --topic my-topic
列出集群全部主题
kafka-topic.sh --zookeeper <zookeeper connect> --list
列出主题详细信息
列出集群全部主题详细信息
kafka-topic.sh --zookeeper <zookeeper connect> -describe
找出全部包含覆盖配置的主题 --topic-with-overrides
列出全部包含不一样步副本的分区 --under-replicated-partitions
kafka-topic.sh --zookeeper <zookeeper connect> --describe --under-replicated-partitions
列出全部没有首领的分区 --unavailable-partitions
列出新版本的消费者群组
Kafka-consumer-groups.sh --new-consumer --bootstrap-server <kafka集群主机:port/kafka-cluster> --list
获取旧版本消费者群组testgroup详细信息
kafka-consumer-group.sh --zookeeper <zookeeper connect> --describe --group testgroup
删除消费者群组
kafka-consumer-groups.sh --zookeeper <zookeeper connect> --delete --group testgroup
删除消费者群组testgroup中my-topic 主题的偏移量
kafka-consumer-groups.sh --zookeeper <zookeeper connect> --delete --group testgroup --topic my-topic
导出群组testgroup的偏移量到offsets文件
kafka-run-class.sh kafka.tools.ExportZkOffsets --zkconnect <zookeeper connect> --group testgroup --output-file offsets
导入偏移量:
先关闭消费者
kafka-run-class.sh kafka.tools.ImportZkOffsets --zkconnect <zookeeper connect> --inpiut-file offsets
更改主题配置的命令格式:
kafka-configs.sh --zookeeper <zookeeper connect> --alter --entity-type topics --entity-name <topic name > -add-config <key>=<value>[,<key>=<value>...]
将主题my-topic 消息保留时间设置为1小时
kafka-confihs.sh --zookeeper <zookeeper connect> --alter --entity-type topic --entity-name my-topic -add-config retention.ms=3600000
更改客户端配置命令格式:
kafka-configs.sh --zookeeper <zookeeper connect> --alter --entity-type clients --entity-name <client ID> -add-config <key>=<value>....
列出主题my-topic 全部被覆盖的配置:
kafka-configs.sh --zookeeper <zookeeper connect> --describe --entity-type topics --entity-name my-topic
删除主题my-topic的retention.ms覆盖配置
kafka-config.sh --zookeeper <zookeeper connect> --alter --entity-type topics --entity-name my-topic --delete-config retention.ms
在一个包含1主题和8个分区集群里启动首选的副本选举
kafka-preferred-replica-election.sh --zookeeper <zookeeper connect>
经过partitions.json 文件里指定分区清单来启动副本的选举
kafka-prefered-replica-election.sh --zookeeper <zookeeper connect> --path-to-json-file partitions.json
修改分区副本:
为topic.json文件里的主题生成迁移步骤,以便将这些主题迁移至broker0 和 broker1上
kafka-reassign-partitions.sh --zookeeper <zookeeper connect> --generate --topics-to-move-json-file topics.json --broker-list 0,1
使用reassign.json 来执行建议的分区分配方案:
kafka-reassign-partitions.sh --zookeeper <zookeeper connect> --execute --reassignment-json-file reassign.json
验证reassign.json文件里指定的分区重分配状况:
kafka-reassign-partitions.sh --zookeeper <zookeeper connect> --verify --reassignment-json-file reassign.json
解码日志片断000052368601.log ,显示消息的概要信息
kafka-run-class.sh kafka.tools.DumpLogSegments --files 000052368601.log
解码日志片断000001.log,显示消息内容
kafka-run-class.sh kafka.tools.DumpLogSegments --files 000001.log --print-data-log
验证日志片断00001.log索引文件的正确性
kafka-run-class.sh kafka.tools.DumpLogSegments --files 00001.index,000001.log --index-sanity-check
// --verify-index-only 将会检查索引的匹配度
对broker1和broker2上以my-开头的主题副本进行验证
kafka-replica-verification.sh --broker-list kafka1.com:9092,kafka2.com:9092 --topic-white-list 'my-*'
使用旧版消费者读取单个主题
kafka-console-consumer.sh --zookeeper <zookeeper connect> --topic my-topic
向主题my-topic 生成2个消息
kafka-console-producer.sh --broker-list kafka1.com:9092,kafka2.com:9092 --topic my-topic