Kafka的集群算法作的很先进,大大强于ActiveMQ。ActiveMQ只有主从互备的HA,负载均衡作的很差,没有消息分片。而Kafka在HA,负载均衡和消息分片上作的很完美。算法
Kafka的消息存储在Broker上,每种类型的消息,存在一个Topic里。每一个Topic分红多个分区。这些分区能够分布在同一个物理机,也能够分布在不一样的物理机。shell
高可用json
Kafka提供一个复制因子的概念,能够把分区复制n份,放在不一样的节点上。这就实现了数据的高可用。而服务的高可用,和ActiveMQ相似。是在客户端注册多个Broker地址,其中一个失效,客户端会自动进行失效转移。负载均衡
消息分片spa
消息经过轮询,或者自定义的算法写入不一样的分区,则实现了消息分片。消息生产者不须要关注消息最终存入Topic的哪一个分区,也不须要关注这个分区在哪台物理机上。分片算法会自动把消息路由到正确的分区。这个算法,很是像一致性哈希。其中每一个物理机有n个分区,这n个分区是这个物理机的虚拟节点,将这些虚拟节点放入Hash环。code
负载均衡server
每一个Topic分区有n分拷贝,可是只有一个节点负责处理消息的发送和接收,称为主节点。其余节点称为从节点,只是被动的同步消息。一旦主节点不能提供服务,从节点会变为主节点。而主从节点的单位是分区。也就是一台物理机,能够是Topic A的主节点,Topic B的从节点。经过此方式实现负载均衡和最大限度的利用系统资源,不会有专门负责备份的从节点物理机。进程
Kafka集群的结构图以下:资源
配置路由
Kafka集群的配置很简单。配置文件是Kafka路径的config的server.properties。而Kafka的启动脚本须要制定配置文件。
bin/kafka-server-start.sh server.properties &
为了能够在单机模拟集群的状况,能够将server.properties文件复制多份,如:
cp config/server.properties config/server-1.properties cp config/server.properties config/server-2.properties
而后使用一样的命令启动Kafka就能够了。可是须要修改一下配置,不能让Broker ID,端口和存储路径冲突。打开server.properties修改如下几行,改为不一样的值。
broker.id=1 port=9093 log.dir=/tmp/kafka-logs-1
另外,zookeeper.connect这个配置也颇有用。只有指向同一个Zookeeper的Kafka才是在同一个集群中。
zookeeper.connect=localhost:2181
最后启动Kafka。
bin/kafka-server-start.sh server.properties & bin/kafka-server-start.sh server-1.properties & bin/kafka-server-start.sh server-2.properties &
平衡主节点
若是某个节点崩溃或死机,那么Kafka会将死机而影响的分区中选举新的主节点。当这个节点从新启动以后,它的全部的分区都是做为从节点,形成资源的浪费。这样能够在启动Kafka以后执行
bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
这样来触发主节点从新选举,让Kafka更加负载均衡。若是每次手动执行命令以为很麻烦,能够在配置文件中配置,让Kafka自动平衡主节点。
auto.leader.rebalance.enable=true
在集群中增长新物理机
只要配置zookeeper.connect为要加入的集群,再启动Kafka进程,就可让新的机器加入到Kafka集群。可是新的机器只针对新的Topic才会起做用,在以前就已经存在的Topic的分区,不会自动的分配到新增长的物理机中。为了使新增长的机器能够分担系统压力,必须进行消息数据迁移。Kafka提供了kafka-reassign-partitions.sh进行数据迁移。这个脚本提供3个命令:
--generate 根据给予的Topic列表和Broker列表生成迁移计划。generate并不会真正进行消息迁移,而是将消息迁移计划计算出来,供execute命令使用。
--execute 根据给予的消息迁移计划进行迁移。
--verify 检查消息是否已经迁移完成。
例子:
1. 建立须要进行迁移的Topic列表,迁移Topic foo1和foo2。
cat topics-to-move.json {"topics": [{"topic": "foo1"}, {"topic": "foo2"}], "version":1 }
2. 调用--generate生成迁移计划,将foo1和foo2迁移到Broker ID是5和6的机器上。
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate
生成结果以下:
Current partition replica assignment {"version":1, "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]}, {"topic":"foo1","partition":0,"replicas":[3,4]}, {"topic":"foo2","partition":2,"replicas":[1,2]}, {"topic":"foo2","partition":0,"replicas":[3,4]}, {"topic":"foo1","partition":1,"replicas":[2,3]}, {"topic":"foo2","partition":1,"replicas":[2,3]}] } Proposed partition reassignment configuration {"version":1, "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]}, {"topic":"foo1","partition":0,"replicas":[5,6]}, {"topic":"foo2","partition":2,"replicas":[5,6]}, {"topic":"foo2","partition":0,"replicas":[5,6]}, {"topic":"foo1","partition":1,"replicas":[5,6]}, {"topic":"foo2","partition":1,"replicas":[5,6]}] }
Current partition replica assignment表示当前的消息存储情况。Proposed partition reassignment configuration表示迁移后的消息存储情况。将迁移后的json存入一个文件(如expand-cluster-reassignment.json),供--execute命令使用。
3. 执行--execute迁移。
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
4. 能够执行--verify查看迁移进度。
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
结果以下:
Status of partition reassignment: Reassignment of partition [foo1,0] completed successfully Reassignment of partition [foo1,1] is in progress Reassignment of partition [foo1,2] is in progress Reassignment of partition [foo2,0] completed successfully Reassignment of partition [foo2,1] completed successfully Reassignment of partition [foo2,2] completed successfully
在集群中删除物理机
在Kafka 0.8.2中会提供此功能。目前可使用增长机器的方法,先将数据都迁移到其余的机器,再删除该节点。