原文:http://blog.csdn.net/changong28/article/details/39325079java
使用Kafka的同窗都知道,咱们每次建立Kafka主题(Topic)的时候能够指定分区数和副本数等信息,若是将这些属性配置到server.properties文件中,之后调用Java API生成的主题将使用默认值,先改变须要使用命令bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --config max.message.bytes=128000显示的修改,咱们也但愿将此过程在Producer调用以前经过API的方式进行设定,无需在以前或以后使用脚本进行操做,因此才了这篇文章。查看源码发现,其实内部全部的实现都是经过TopicCommand的main方法,在此记录两种方式:spa
一、建立主题(Topic).net
【命令方式】:bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name --partitions 20 --replication-factor 3 --config x=ycode
【JAVA API方式】:server
二、查看全部主题blog
【命令方式】:bin/kafka-topics.sh --list --zookeeper localhost:2181ci
【JAVA API方式】:文档
三、查看指定主题:get
【命令方式】:bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topickafka
【JAVA API方式】:
四、修改主题:
【命令方式】:bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --deleteConfig x
【JAVA API方式】:
五、删除出题:
【命令方式】:无
【JAVA API方式】:
另:下文kafka删除topic的方法(出自 “菜光光的博客” 博客,出处http://caiguangguang.blog.51cto.com/1652935/1548069)
0.8的官方文档提供了一个删除topic的命令:
kafka-topics.sh --delete 可是在运行时会报错找不到这个方法。
kafka-topics.sh最终是运行了kafka.admin.TopicCommand这个类,在0.8的源码中这个类中没有找到有delete topic相关的代码。
在kafka的admin包下,提供了一个DeleteTopicCommand的类,能够实现删除topic的功能。
kafka.admin.DeleteTopicCommand
其中删除topic的具体实现代码以下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
import
org.I0Itec.zkclient.ZkClient
import
kafka.utils.{Utils, ZKStringSerializer, ZkUtils}
.......
val topic = options.valueOf(topicOpt)
val zkConnect = options.valueOf(zkConnectOpt)
var zkClient: ZkClient =
null
try
{
zkClient =
new
ZkClient(zkConnect,
30000
,
30000
, ZKStringSerializer)
zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))
//其实最终仍是经过删除zk里面对应的路径来实现删除topic的功能
println(
"deletion succeeded!"
)
}
catch
{
case
e: Throwable =>
println(
"delection failed because of "
+ e.getMessage)
println(Utils.stackTrace(e))
}
finally
{
if
(zkClient !=
null
)
zkClient.close()
}
|
由于这个命令只会删除zk里面的信息,真实的数据仍是没有删除,因此须要登陆各个broker,把对应的topic的分区数据目录删除,也可能正由于这一点,delete命令才没有集成到kafka.admin.TopicCommand这个类。