对于 kafka 主题(topic)的管理(增删改查),使用最多的即是kafka自带的脚本。node
建立主题
kafka提供了自带的 kafka-topics
脚本,用来帮助用户建立主题(topic)。shell
bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name --partitions 1 --replication-factor 1
create 代表咱们要建立主题,而 partitions 和 replication factor 分别设置了主题的分区数以及每一个分区下的副本数。json
这里为何用的 --bootstrap-server
参数,而不是 --zookeeper
?
--zookeeper
参数是以前版本的用法,从kafka 2.2 版本开始,社区推荐使用 --bootstrap-server
参数替换 --zoookeeper
,而且显式地将后者标记为 “已过时”,所以,若是你已经在使用 2.2 版本了,那么建立主题请指定 --bootstrap-server
参数。bootstrap
推荐使用 --bootstrap-server
而非 --zookeeper
的缘由主要有两个。缓存
- 使用 --zookeeper 会绕过 Kafka 的安全体系。这就是说,即便你为 Kafka 集群设置了安全认证,限制了主题的建立,若是你使用 --zookeeper 的命令,依然能成功建立任意主题,不受认证体系的约束。这显然是 Kafka 集群的运维人员不但愿看到的。
- 使用 --bootstrap-server 与集群进行交互,愈来愈成为使用 Kafka 的标准姿式。换句话说,之后会有愈来愈少的命令和 API 须要与 ZooKeeper 进行链接。这样,咱们只须要一套链接信息,就能与 Kafka 进行全方位的交互,不用像之前同样,必须同时维护 ZooKeeper 和 Broker 的链接信息。
查询主题
建立好主题以后,Kafka 容许咱们使用相同的脚本查询主题。你可使用下面的命令,查询全部主题的列表。安全
bin/kafka-topics.sh --bootstrap-server broker_host:port --list
若是要查询单个主题的详细数据,你可使用下面的命令。运维
bin/kafka-topics.sh --bootstrap-server broker_host:port --describe --topic <topic_name>
若是 describe 命令不指定具体的主题名称,那么 Kafka 默认会返回全部 “可见” 主题的详细数据给你。异步
这里的 “可见”,是指发起这个命令的用户可以看到的 Kafka 主题。这和前面说到主题建立时,使用 --zookeeper 和 --bootstrap-server 的区别是同样的。若是指定了 --bootstrap-server,那么这条命令就会受到安全认证体系的约束,即对命令发起者进行权限验证,而后返回它能看到的主题。不然,若是指定 --zookeeper 参数,那么默认会返回集群中全部的主题详细数据。基于这些缘由,我建议你最好统一使用 --bootstrap-server 链接参数。spa
修改主题
修改主题分区
其实就是增长分区,目前 Kafka 不容许减小某个主题的分区数。你可使用 kafka-topics 脚本,结合 --alter 参数来增长某个主题的分区数,命令以下:线程
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic <topic_name> --partitions < 新分区数 >
这里要注意的是,你指定的分区数必定要比原有分区数大,不然 Kafka 会抛出 InvalidPartitionsException 异常。
修改主题级别参数
在主题建立以后,咱们可使用 kafka-configs 脚本修改对应的参数。
假设咱们要设置主题级别参数 max.message.bytes,那么命令以下:
bin/kafka-configs.sh --zookeeper zookeeper_host:port --entity-type topics --entity-name <topic_name> --alter --add-config max.message.bytes=10485760
也许你会以为奇怪,为何这个脚本就要指定 --zookeeper,而不是 --bootstrap-server 呢?其实,这个脚本也能指定 --bootstrap-server 参数,只是它是用来设置动态参数的。在专栏后面,我会详细介绍什么是动态参数,以及动态参数都有哪些。如今,你只须要了解设置常规的主题级别参数,仍是使用 --zookeeper。
变动副本数
使用自带的 kafka-reassign-partitions 脚本,帮助咱们增长主题的副本数。
假设kafka的内部主题 __consumer_offsets
只有 1 个副本,如今咱们想要增长至 3 个副本。下面是操做:
- 建立一个 json 文件,显式提供 50 个分区对应的副本数。注意,replicas 中的 3 台 Broker 排列顺序不一样,目的是将 Leader 副本均匀地分散在 Broker 上。该文件具体格式以下
{"version":1, "partitions":[ {"topic":"__consumer_offsets","partition":0,"replicas":[0,1,2]}, {"topic":"__consumer_offsets","partition":1,"replicas":[0,2,1]}, {"topic":"__consumer_offsets","partition":2,"replicas":[1,0,2]}, {"topic":"__consumer_offsets","partition":3,"replicas":[1,2,0]}, ... {"topic":"__consumer_offsets","partition":49,"replicas":[0,1,2]} ]}
- 执行
kafka-reassign-patitions
脚本,命令以下:
bin/kafka-reassign-partitions.sh --zookeeper zookeeper_host:port --reassignment-json-file reassign.json --execute
除了修改内部主题,咱们可能还想查看这些内部主题的消息内容。特别是对于 __consumer_offsets 而言,因为它保存了消费者组的位移数据,有时候直接查看该主题消息是很方便的事情。下面的命令能够帮助咱们直接查看消费者组提交的位移数据。
bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
除了查看位移提交数据,咱们还能够直接读取该主题消息,查看消费者组的状态信息。
bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$GroupMetadataMessageFormatter" --from-beginning
对于内部主题 __transaction_state 而言,方法是相同的。你只须要指定 kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter 便可。
修改主题限速
这里主要是指设置 Leader 副本和 Follower 副本使用的带宽。有时候,咱们想要让某个主题的副本在执行副本同步机制时,不要消耗过多的带宽。Kafka 提供了这样的功能。我来举个例子。假设我有个主题,名为 test,我想让该主题各个分区的 Leader 副本和 Follower 副本在处理副本同步时,不得占用超过 100MBps 的带宽。注意是大写 B,即每秒不超过 100MB。那么,咱们应该怎么设置呢?
要达到这个目的,咱们必须先设置 Broker 端参数 leader.replication.throttled.rate 和 follower.replication.throttled.rate,命令以下:
bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.rate=104857600,follower.replication.throttled.rate=104857600' --entity-type brokers --entity-name 0
这条命令结尾处的 --entity-name 就是 Broker ID。假若该主题的副本分别在 0、一、二、3 多个 Broker 上,那么你还要依次为 Broker 一、二、3 执行这条命令。
设置好这个参数以后,咱们还须要为该主题设置要限速的副本。在这个例子中,咱们想要为全部副本都设置限速,所以统一使用通配符 * 来表示,命令以下:
bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*' --entity-type topics --entity-name test
主题分区迁移
一样是使用 kafka-reassign-partitions 脚本,对主题各个分区的副本进行 “手术” 般的调整,好比把某些分区批量迁移到其余 Broker 上。
删除主题
bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic <topic_name>
删除主题的命令并不复杂,关键是删除操做是异步的,执行完这条命令不表明主题当即就被删除了。它仅仅是被标记成 “已删除” 状态而已。Kafka 会在后台默默地开启主题删除操做。所以,一般状况下,你都须要耐心地等待一段时间。
主题删除失败
当运行完上面的删除命令后,不少人发现已删除主题的分区数据依然 “躺在” 硬盘上,没有被清除。这时该怎么办呢?
实际上,形成主题删除失败的缘由有不少,最多见的缘由有两个:
- 副本所在的 Broker 宕机了
- 待删除主题的部分分区依然在执行迁移过程。
若是是由于前者,一般你重启对应的 Broker 以后,删除操做就能自动恢复;若是是由于后者,那就麻烦了,极可能两个操做会相互干扰。
无论什么缘由,一旦你碰到主题没法删除的问题,能够采用这样的方法:
-
手动删除 ZooKeeper 节点 /admin/delete_topics 下以待删除主题为名的 znode。
-
手动删除该主题在磁盘上的分区目录。
-
在 ZooKeeper 中执行 rmr /controller,触发 Controller 重选举,刷新 Controller 缓存。
在执行最后一步时,你必定要谨慎,由于它可能形成大面积的分区 Leader 重选举。事实上,仅仅执行前两步也是能够的,只是 Controller 缓存中没有清空待删除主题罢了,也不影响使用。
常见问题
__consumer_offsets 占用太多的磁盘
一旦你发现这个主题消耗了过多的磁盘空间,那么,你必定要显式地用 jstack 命令查看一下 kafka-log-cleaner-thread 前缀的线程状态。一般状况下,这都是由于该线程挂掉了,没法及时清理此内部主题。假若真是这个缘由致使的,那咱们就只能重启相应的 Broker 了。另外,请你注意保留出错日志,由于这一般都是 Bug 致使的,最好提交到社区看一下。