目录apache
@(博客文章)[kafka|大数据]编程
本系统文章共三篇,分别为
一、kafka集群原理介绍了如下几个方面的内容:
(1)kafka基础理论
(2)参数配置
(3)错误处理
(4)kafka集群在zookeeper集群中的内容json
二、kafka集群操做介绍了kafka集群的安装与操做
(1)单机版安装
(2)集群安装
(3)集群启停操做
(4)topic相关操做
(5)某个broker挂掉,重启本机器
(6)某个broker挂掉且没法重启,使用其它机器代替
(7)扩容
(8)数据迁移
(9)机器下线
(10)增长副本数量
(11)平衡leader服务器
三、kafka集群编程介绍了...并发
此部分不可用于生产,但新接触kafka时,能够先有个感性的认识分布式
Step 1: 下载Kafkaoop
下载最新的版本并解压.大数据
$ wget http://mirrors.cnnic.cn/apache/kafka/0.8.2.1/kafka_2.10-0.8.2.1.tgz $ tar -zxvf kafka_2.10-0.8.2.1.tgz
Step 2: 启动服务
Kafka用到了Zookeeper,全部首先启动Zookper,下面简单的启用一个单实例的Zookkeeper服务。能够在命令的结尾加个&符号,这样就能够启动后离开控制台。this
> bin/zookeeper-server-start.sh config/zookeeper.properties &
如今启动Kafka:编码
bin/kafka-server-start.sh config/server.properties
Step 3: 建立 topic
建立一个叫作“test”的topic,它只有一个分区,一个副本。
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test [2015-06-04 13:17:13,943] WARN Connected to an old server; r-o mode will be unavailable (org.apache.zookeeper.ClientCnxnSocket) Created topic "test".
能够经过list命令查看建立的topic:
> bin/kafka-topics.sh --list --zookeeper localhost:2181 > test
除了手动建立topic,还能够配置broker让它自动建立topic.
Step 4:发送消息.
Kafka 使用一个简单的命令行producer,从文件中或者从标准输入中读取消息并发送到服务端。默认的每条命令将发送一条消息。
运行producer并在控制台中输一些消息,这些消息将被发送到服务端:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test This is a messageThis is another message
ctrl+c能够退出发送。
默认状况下,日志数据会被放置到/tmp/kafka-logs中,每一个分区一个目录
Step 5: 启动consumer
Kafka also has a command line consumer that will dump out messages to standard output.
Kafka也有一个命令行consumer能够读取消息并输出到标准输出:
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning This is a message This is another message
你在一个终端中运行consumer命令行,另外一个终端中运行producer命令行,就能够在一个终端输入消息,另外一个终端读取消息。
这两个命令都有本身的可选参数,能够在运行的时候不加任何参数能够看到帮助信息。
注意,必须先搭建zookeeper集群
一、使用3台机器搭建Kafka集群:
192.168.169.92 gdc-dn01-test
192.168.169.93 gdc-dn02-test
192.168.169.94 gdc-dn03-test
二、在安装Kafka集群以前,这里没有使用Kafka自带的Zookeeper,而是独立安装了一个Zookeeper集群,也是使用这3台机器,保证Zookeeper集群正常运行。
三、首先,在gdc-dn01-test上准备Kafka安装文件,执行以下命令:
wget http://mirrors.cnnic.cn/apache/kafka/0.8.2.1/kafka_2.10-0.8.2.1.tgz tar xvzf kafka_2.10-0.8.2.1.tgz mv kafka_2.10-0.8.2.1 kafka
四、修改配置文件kafka/config/server.properties,修改以下内容:
broker.id=0 zookeeper.connect=192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka
这里须要说明的是,默认Kafka会使用ZooKeeper默认的/路径,这样有关Kafka的ZooKeeper配置就会散落在根路径下面,若是 你有其余的应用也在使用ZooKeeper集群,查看ZooKeeper中数据可能会不直观,因此强烈建议指定一个chroot路径,直接在 zookeeper.connect配置项中指定:
zookeeper.connect=192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka
并且,须要手动在ZooKeeper中建立路径/kafka,使用以下命令链接到任意一台ZooKeeper服务器:
cd ~/zookeeper
bin/zkCli.sh
在ZooKeeper执行以下命令建立chroot路径:
create /kafka ''
这样,每次链接Kafka集群的时候(使用--zookeeper选项),也必须使用带chroot路径的链接字符串,后面会看到。
五、而后,将配置好的安装文件同步到其余的dn0二、dn03节点上:
scp -r /usr/local/kafka_2.10-0.8.2.1/ 192.168.169.92:/home/hadoop scp -r /usr/local/kafka_2.10-0.8.2.1/ 192.168.169.93:/home/hadoop
六、最后,在dn0二、dn03节点上配置修改配置文件kafka/config/server.properties内容以下所示:
broker.id=1 # 在dn02修改 broker.id=2 # 在dn03修改
由于Kafka集群须要保证各个Broker的id在整个集群中必须惟一,须要调整这个配置项的值(若是在单机上,能够经过创建多个Broker进程来模拟分布式的Kafka集群,也须要Broker的id惟一,还须要修改一些配置目录的信息)。
七、在集群中的dn0一、dn0二、dn03这三个节点上分别启动Kafka,分别执行以下命令:
bin/kafka-server-start.sh config/server.properties &
能够经过查看日志,或者检查进程状态,保证Kafka集群启动成功。
八、建立一个名称为my-replicated-topic5的Topic,5个分区,而且复制因子为3,执行以下命令:
bin/kafka-topics.sh --create --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --replication-factor 3 --partitions 5 --topic my-replicated-topic5
九、查看建立的Topic,执行以下命令:
bin/kafka-topics.sh --describe --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --topic my-replicated-topic5
结果信息以下所示:
Topic:my-replicated-topic5 PartitionCount:5 ReplicationFactor:3 Configs: Topic: my-replicated-topic5 Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: my-replicated-topic5 Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 Topic: my-replicated-topic5 Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: my-replicated-topic5 Partition: 3 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0 Topic: my-replicated-topic5 Partition: 4 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1 上面Leader、Replicas、Isr的含义以下:
1 Partition: 分区
2 Leader : 负责读写指定分区的节点
3 Replicas : 复制该分区log的节点列表
4 Isr : "in-sync" replicas,当前活跃的副本列表(是一个子集),而且可能成为Leader
咱们能够经过Kafka自带的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh脚本,来验证演示若是发布消息、消费消息。
十一、在一个终端,启动Producer,并向咱们上面建立的名称为my-replicated-topic5的Topic中生产消息,执行以下脚本:
bin/kafka-console-producer.sh --broker-list 192.168.169.92:9092, 192.168.169.93:9092, 192.168.169.94:9092 --topic my-replicated-topic5
十二、在另外一个终端,启动Consumer,并订阅咱们上面建立的名称为my-replicated-topic5的Topic中生产的消息,执行以下脚本:
bin/kafka-console-consumer.sh --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --from-beginning --topic my-replicated-topic5
能够在Producer终端上输入字符串消息行,就能够在Consumer终端上看到消费者消费的消息内容。
也能够参考Kafka的Producer和Consumer的Java API,经过API编码的方式来实现消息生产和消费的处理逻辑。
一、启动集群
bin/kafka-server-start.sh config/server.properties &
二、中止集群
bin/kafka-server-stop.sh
三、重启
没有专用脚本,先停后启便可
注:固然也可使用kill命令来关闭,但使用脚本有如下好处:
(1)It will sync all its logs to disk to avoid needing to do any log recovery when it restarts (i.e. validating the checksum for all messages in the tail of the log). Log recovery takes time so this speeds up intentional restarts.
(2)It will migrate any partitions the server is the leader for to other replicas prior to shutting down. This will make the leadership transfer faster and minimize the time each partition is unavailable to a few milliseconds.
一、建立topic
bin/kafka-topics.sh --create --zookeeper 192.168.172.98:2181/kafka --replication-factor 2 --partitions 3 --topic test_topic
(1)zookeeper指定其中一个节点便可,集群之间会自动同步。
(2)--replication-factor 2 --partitions 3理论上应该是可选参数,但此脚本必须写这2个参数。
(3)还可使用--config <name=value>来指定topic的某个具体参数,以代替配置文件中的参数。如:
bin/kafka-topics.sh --create --zookeeper 192.168.172.98:2181/kafka --replication-factor 2 --partitions 3 --topic test_topic retention.bytes=3298534883328
指定了某个topic最大的保留日志量,单位是字节。
二、查看所有topic
bin/kafka-topics.sh --list --zookeeper 192.168.172.98:2181/kafka
三、查看某个topic的详细信息
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs: Topic: test_topic Partition: 0 Leader: 3 Replicas: 3,4 Isr: 3,4 Topic: test_topic Partition: 1 Leader: 4 Replicas: 4,5 Isr: 4,5 Topic: test_topic Partition: 2 Leader: 5 Replicas: 5,2 Isr: 5,2
(1)第一行列出了这个topic的整体状况,如topic名称,分区数量,副本数量等。
(2)第二行开始,每一行列出了一个分区的信息,如它是第几个分区,这个分区的leader是哪一个broker,副本位于哪些broker,有哪些副本处理同步状态。
四、启动一个console producer,用于在console中模拟输入消息
bin/kafka-console-producer.sh --broker-list 192.168.172.111:9092 --topic test_topic
五、启动一个console consumer,用于模拟接收消息,并在console中输出
bin/kafka-console-consumer.sh --zookeeper 192.168.172.111:2181/kafka --topic test_topic
此脚本能够用于验证一个topic的数据状况,看消息是否正常流入等。
六、删除一个topic
bin/kafka-topics.sh --delete --zookeeper 192.168.172.98:2181/kafka --topic test_topic
(1)配置文件中必须delete.topic.enable=true,不然只会标记为删除,而不是真正删除。
(2)执行此脚本的时候,topic的数据会同时被删除。若是因为某些缘由致使topic的数据不能彻底删除(如其中一个broker down了),此时topic只会被marked for deletion,而不会真正删除。此时建立同名的topic会有冲突。
七、修改topic
使用—-alert原则上能够修改任何配置,如下列出了一些经常使用的修改选项:
(1)改变分区数量
bin/kafka-topics.sh --alter --zookeeper 192.168.172.98:2181/kafka --topic test_topic --partitions 4
(2)增长、修改或者删除一个配置参数
bin/kafka-topics.sh —alter --zookeeper 192.168.172.98:2181/kafka --topic my_topic_name --config key=value bin/kafka-topics.sh —alter --zookeeper 192.168.172.98:2181/kafka --topic my_topic_name --deleteConfig key
【结论】若是一个broker挂掉,且能够重启则处理步骤以下:
(1)重启kafka进程
(2)执行rebalance(因为已经设置配置项自动执行balance,所以此步骤通常可忽略)
详细分析见下面操做过程。
一、topic的状况
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs: Topic: test_topic Partition: 0 Leader: 5 Replicas: 5,2 Isr: 5,2 Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3 Topic: test_topic Partition: 2 Leader: 3 Replicas: 3,4 Isr: 3,4
集群中有4台机器,id为【2-5】,topic 有3个分区,每一个分区2个副本,leader分别位于2,3,5中。
二、模拟机器down,kill掉进程
分区0的leader位于id=5的broker中,kill掉这台机器的kafka进程
kill -9 ****
三、再次查看topic的状况
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs: Topic: test_topic Partition: 0 Leader: 2 Replicas: 5,2 Isr: 2 Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3 Topic: test_topic Partition: 2 Leader: 3 Replicas: 3,4 Isr: 3,4
能够看到,分区0的leader已经移到id=2的机器上了,它的副本位于2,5这2台机器上,但处于同步状态的只有id=2这台机器。
四、重启kafka进程
bin/kafka-server-start.sh config/server.properties &
五、再次查看状态
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs: Topic: test_topic Partition: 0 Leader: 2 Replicas: 5,2 Isr: 2,5 Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3 Topic: test_topic Partition: 2 Leader: 3 Replicas: 3,4 Isr: 3,4
发现分区0的2个副本都已经处于同步状态,但leader依然为id=2的broker。
六、执行leader平衡
详见leader的平衡部分。
bin/kafka-preferred-replica-election.sh --zookeeper 192.168.172.98:2181/kafka
若是配置文件中
auto.leader.rebalance.enable=true
则此步骤不须要执行。
七、从新查看topic
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs: Topic: test_topic Partition: 0 Leader: 5 Replicas: 5,2 Isr: 2,5 Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3 Topic: test_topic Partition: 2 Leader: 3 Replicas: 3,4 Isr: 3,4
此时leader已经回到了id=5的broker,一切恢复正常。
【结论】当一个broker挂掉,须要换机器时,采用如下步骤:
一、将新机器kafka配置文件中的broker.id设置为与原机器同样
二、启动kafka,注意kafka保存数据的目录不会自动建立,须要手工建立
详细分析过程以下:
一、初始化机器,主要包括用户建立,kafka文件的复制等。
二、修改config/server.properties文件
注意,只须要修改一个配置broker.id,且此配置必须与挂掉的那台机器相同,由于kafka是经过broker.id来区分集群中的机器的。此处设为
broker.id=5
三、查看topic的当前状态
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs: Topic: test_topic Partition: 0 Leader: 5 Replicas: 5,2 Isr: 2,5 Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3 Topic: test_topic Partition: 2 Leader: 3 Replicas: 3,4 Isr: 3,4
当前topic有3个分区,其中分区1的leader位于id=5的机器上。
四、关掉id=5的机器
kill -9 ** 用于模拟机器忽然down
或者:
bin/kafka-server-stop.sh
用于正常关闭
五、查看topic的状态
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs: Topic: test_topic Partition: 0 Leader: 2 Replicas: 5,2 Isr: 2 Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3 Topic: test_topic Partition: 2 Leader: 3 Replicas: 3,4 Isr: 3,4
可见,topic的分区0的leader已经迁移到了id=2的机器上,且处于同步的机器只有一个了。
六、启动新机器
nohup bin/kafka-server-start.sh config/server.properties &
七、再看topic的状态
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs: Topic: test_topic Partition: 0 Leader: 2 Replicas: 5,2 Isr: 2,5 Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3 Topic: test_topic Partition: 2 Leader: 3 Replicas: 3,4 Isr: 3,4
id=5的机器也处于同步状态了,但还须要将leader恢复到这台机器上。
八、执行leader平衡
详见leader的平衡部分。
bin/kafka-preferred-replica-election.sh --zookeeper 192.168.172.98:2181/kafka
若是配置文件中
auto.leader.rebalance.enable=true
则此步骤不须要执行。
九、done
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs: Topic: test_topic Partition: 0 Leader: 5 Replicas: 5,2 Isr: 2,5 Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3 Topic: test_topic Partition: 2 Leader: 3 Replicas: 3,4 Isr: 3,4
全部内容都恢复了
将一台机器加入kafka集群很容易,只须要为它分配一个独立的broker id,而后启动它便可。可是这些新加入的机器上面并无任何的分区数据,因此除非将现有数据移动这些机器上,不然它不会作任何工做,直到建立新topic。所以,当你往集群加入机器时,你应该将其它机器上的一部分数据往这台机器迁移。
数据迁移的工做须要手工初始化,而后自动完成。它的原理以下:当新机器起来后,kafka将其它机器的一些分区复制到这个机器上,并做为follower,当这个新机器完成复制并成为in-sync状态后,那些被复制的分区的一个副本会被删除。(都不会成为leader?)
一、将新机器kafka配置文件中的broker.id设置为与原机器同样
二、启动kafka,注意kafka保存数据的目录不会自动建立,须要手工建立
此时新建的topic都会优先分配leader到新增的机器上,但原有的topic不会将分区迁移过来。
三、数据迁移,请见数据迁移部分。
如下步骤用于将现有数据迁移到新的broker中,假设须要将test_topic与streaming_ma30_sdc的所有分区迁移到新的broker中(id 为6和7)
一、建立一个json文件,用于指定哪些topic将被迁移过去
cat topics-to-move.json
{"topics": [ {"topic": "test_topic"}, {"topic": "streaming_ma30_sdc"} ], "version":1 }
注意全角,半角符号,或者中文引号之类的问题。
二、先generate迁移后的结果,检查一下是否是你要想的效果
bin/kafka-reassign-partitions.sh --zookeeper 192.168.172.98:2181/kafka --topics-to-move-json-file topics-to-move.json --broker-list "6,7" —generate Current partition replica assignment {"version":1,"partitions":[{"topic":"streaming_ma30_sdc","partition":2,"replicas":[2]},{"topic":"test_topic","partition":0,"replicas":[5,2]},{"topic":"test_topic","partition":2,"replicas":[3,4]},{"topic":"streaming_ma30_sdc","partition":1,"replicas":[5]},{"topic":"streaming_ma30_sdc","partition":0,"replicas":[4]},{"topic":"test_topic","partition":1,"replicas":[2,3]},{"topic":"streaming_ma30_sdc","partition":3,"replicas":[3]},{"topic":"streaming_ma30_sdc","partition":4,"replicas":[4]}]} Proposed partition reassignment configuration {"version":1,"partitions":[{"topic":"test_topic","partition":0,"replicas":[7,6]},{"topic":"streaming_ma30_sdc","partition":2,"replicas":[7]},{"topic":"test_topic","partition":2,"replicas":[7,6]},{"topic":"streaming_ma30_sdc","partition":1,"replicas":[6]},{"topic":"test_topic","partition":1,"replicas":[6,7]},{"topic":"streaming_ma30_sdc","partition":0,"replicas":[7]},{"topic":"streaming_ma30_sdc","partition":4,"replicas":[7]},{"topic":"streaming_ma30_sdc","partition":3,"replicas":[6]}]}
分别列出了当前的状态以及迁移后的状态。
把这2个json分别保存下来,第一个用来万一须要roll back的时候使用,第二个用来执行迁移。
三、执行迁移
bin/kafka-reassign-partitions.sh --zookeeper 192.168.172.98:2181/kafka --reassignment-json-file expand-cluster-reassignment.json --execute 其中expand-cluster-reassignment.json为保存上面第二段json的文件。
四、查看迁移过程
bin/kafka-reassign-partitions.sh --zookeeper 192.168.172.98:2181/kafka --reassignment-json-file expand-cluster-reassignment.json --verify Status of partition reassignment: Reassignment of partition [streaming_ma30_sdc,0] is still in progress Reassignment of partition [streaming_ma30_sdc,4] is still in progress Reassignment of partition [test_topic,2] completed successfully Reassignment of partition [test_topic,0] completed successfully Reassignment of partition [streaming_ma30_sdc,3] is still in progress Reassignment of partition [streaming_ma30_sdc,1] is still in progress Reassignment of partition [test_topic,1] completed successfully Reassignment of partition [streaming_ma30_sdc,2] is still in progress
五、当全部迁移的完成后,查看一下结果是否是你想要的
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs: Topic: test_topic Partition: 0 Leader: 7 Replicas: 7,6 Isr: 6,7 Topic: test_topic Partition: 1 Leader: 6 Replicas: 6,7 Isr: 6,7 Topic: test_topic Partition: 2 Leader: 7 Replicas: 7,6 Isr: 6,7
完成
以上步骤将整个topic迁移,也能够只迁移其中一个或者多个分区。
如下将test_topic的分区1移到broker id为2,3的机器,分区2移到broker id为4,5的机器.
【其实仍是整个topic迁移好一点,否则准备迁移文件会很麻烦】
一、准备迁移配置文件
cat custom-reassignment.json
{"version":1,"partitions":[{"topic":"test_topic","partition":1,"replicas":[2,3]},{"topic":"test_topic","partition":2,"replicas":[4,5]}]}
三、执行迁移
bin/kafka-reassign-partitions.sh --zookeeper 192.168.172.98:2181/kafka --reassignment-json-file custom-reassignment.json --execute
四、查看迁移过程
bin/kafka-reassign-partitions.sh --zookeeper 192.168.172.98:2181/kafka --reassignment-json-file custom-reassignment.json --verify
五、查看迁移结果
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic
当一个机器下线时,kafka并不会自动将这台机器上的副本迁移到其它机器上,所以,咱们须要手工进行迁移。这个过程会至关的无聊,kafka打算在0.8.2版本中添加此特性。
有了吗?再找找。若是只是替换机器则不会有这个问题。
Increasing replication factor
Increasing the replication factor of an existing partition is easy. Just specify the extra replicas in the custom reassignment json file and use it with the --execute option to increase the replication factor of the specified partitions.
For instance, the following example increases the replication factor of partition 0 of topic foo from 1 to 3. Before increasing the replication factor, the partition's only replica existed on broker 5. As part of increasing the replication factor, we will add more replicas on brokers 6 and 7.
The first step is to hand craft the custom reassignment plan in a json file-
cat increase-replication-factor.json
{"version":1,
"partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}
Then, use the json file with the --execute option to start the reassignment process-
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
Current partition replica assignment
{"version":1,
"partitions":[{"topic":"foo","partition":0,"replicas":[5]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
"partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}
The --verify option can be used with the tool to check the status of the partition reassignment. Note that the same increase-replication-factor.json (used with the --execute option) should be used with the --verify option
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify
Status of partition reassignment:
Reassignment of partition [foo,0] completed successfully
You can also verify the increase in replication factor with the kafka-topics tool-
bin/kafka-topics.sh --zookeeper localhost:2181 --topic foo --describe
Topic:foo PartitionCount:1 ReplicationFactor:3 Configs:
Topic: foo Partition: 0 Leader: 5 Replicas: 5,6,7 Isr: 5,6,7
当一个broker down掉时,全部原本将它做为leader的分区会被将leader转移到其它broker。这意味着当这个broker重启时,它将再也不担任何分区的leader,kafka的client也不会从这个broker来读取消息,致使资源的浪费。
为了不这种状况的发生,kafka增长了一个标记:优先副本(preferred replicas)。若是一个分区有3个副本,且这3个副本的优先级别分别为1,5,9,则1会做为leader。为了使kafka集群恢复默认的leader,须要运行如下命令:
bin/kafka-preferred-replica-election.sh --zookeeper 192.168.172.98:2181/kafka
或者能够设置如下配置项,leader 会自动执行balance:
auto.leader.rebalance.enable=true
这配置默认即为空,但须要通过一段时间后才会触发,约半小时。