1,添加和删除topicbootstrap
在topic配置中若是设置了auto_created topic 为true,则当生产者第一次将数据发布到一个不存在的topic,topic会自动建立。固然,topic也能够手动建立:ubuntu
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name --partitions 20 --replication-factor 3 --config x=y
参数 replication-factor控制每条写入的消息会复制到多少个服务器(broker)。服务器的个数要>=replication-factor的个数.不然要挂。官方是建议replication-factor至少要>=2这样数据消费过程不会被打断。服务器
参数partition控制topic将被写入多少log(所谓的partition就是文件夹个数),分区数=最大并行消费者数量。ui
2,修改topiccode
添加分区数orm
t@ubuntu:~/source/kafka_2.10-0.10.0.0/bin$ ./kafka-topics.sh --zookeeper localhost:2181 --alter --topic myTest4 --partitions 8 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded! t@ubuntu:~/source/kafka_2.10-0.10.0.0/bin$
三个broker的数据目录自动生成三个分区目录,其中数据文件大小均为0server
添加配置ip
bin/kafka-configs.sh --zookeeper zk_host:port/chroot --entity-type topics --entity-name my_topic_name --alter --add-config x=y
删除配置资源
bin/kafka-configs.sh --zookeeper zk_host:port/chroot --entity-type topics --entity-name my_topic_name --alter --delete-config x
删除topic文档
首先要把server配置文件添加以下项:
delete.topic.enable=true
若是没有设置为true,则有以下结果:
t@ubuntu:~/source/kafka_2.10-0.10.0.0/bin$ ./kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTest4 Topic myTest4 is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.
只是标记而已,myTest4数据依旧能够读
3,关闭kafka服务
kafka集群能自动检测关闭或者失败的服务器,并在其余服务器中选出新的leader。若是关闭服务是为了维护或者更新配置,能够选择一种相对优雅的方式关闭kafka服务。
首先要将配置文件添加以下:
controlled.shutdown.enable=true
经过bin目录下zookeeper-server-stop.sh关闭,这样关闭有两个好处
1,关闭前会吧数据同步到磁盘,避免重启服务作额外的数据恢复于是启动耗时;
2,It will migrate any partitions the server is the leader for to other replicas prior to shutting down. This will make the leadership transfer faster and minimize the time each partition is unavailable to a few milliseconds.大意是下降分区不可用时间,具体我也不了解。
4,平衡leadership
官网写的不太好理解。若是本地分别启动三个broker,第一个broker会成为全部分区的leader,这样读写所有到broker1,对资源利用很不利,因此要平衡leader到各个服务器上。有两种方法,
1,利用命令手动平衡一下:
bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
配置文件里面添加以下项目:
auto.leader.rebalance.enable=true
5,机架间phenomenon副本
让副本相同的分区在不一样机架上进行复制,避免某个机架挂了致使数据丢失。
须要在配置文件中添加broker归属的机架id
broker.rack=my-rack-id
目前在连机房都没有的小公司,这个估计是用不到了。呵呵。。。。
6,集群间镜像数据
呵呵。。。公司过小,暂时是用不到了。
7,检查消费者位置
有时候查看一下消费者位置是有用的。
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test
t@ubuntu:~/source/kafka_2.10-0.10.0.0$ bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --describe --group pm_ws_6d0a289d7d7b11e7a9f20242c0a80010 GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER pm_ws_6d0a289d7d7b11e7a9f20242c0a80010 test 0 98 1113 1015 none
8,管理消费组
官方文档的命令是
bin/kafka-consumer-groups.sh --bootstrap-server broker1:9092 --list
本地运行以下,应该是版本有点落后不一致,
t@ubuntu:~/source/kafka_2.10-0.10.0.0$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list Missing required argument "[zookeeper]" Option Description ------ ----------- --bootstrap-server <server to connect REQUIRED (only when using new- to> consumer): The server to connect to. --command-config <command config Property file containing configs to be property file> passed to Admin Client and Consumer. --delete Pass in groups to delete topic partition offsets and ownership information over the entire consumer group. For instance --group g1 -- group g2
对2.10-0.10.0.0命令以下
t@ubuntu:~/source/kafka_2.10-0.10.0.0$ bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list console-consumer-64888 aaa pm_ws_6d0a289d7d7b11e7a9f20242c0a80010 console-consumer-7335 wuwuwu console-consumer-17559 awaken console-consumer-47823 pm_ws_6d0a289d7d7b11e7a9f20242c0a80009 group1 0 test-consumer-group108 console-consumer-95228 console-consumer-51385 console-consumer-32687 group-1 t@ubuntu:~/source/kafka_2.10-0.10.0.0$