Kafka是一个分布式的流数据平台,可发布、订阅消息流,使用zookeeper进行集群管理。也可做为一个消息队列中间件,相似于RabbitMQ,ActiveMQ,ZeroMQ等。由LinkdIn开源,用Scala语言实现。
Kafka有以下特色:java
// 从官网下载最新版本,这里为:kafka_2.11-1.0.0.tgznode
// 解压git
$ tar -xzf kafka_2.11-1.0.0.tgz $ cd kafka_2.11-1.0.0
// Kafka用到了zookeeper,因此须要启动zookeeper(新版本内置了zookeeper,若是读者已有其余zookeeper启动了,这步能够略过)github
$ bin/zookeeper-server-start.sh config/zookeeper.properties
// 修改配置文件,并启动kafka server:
config/server.properties中的zookeeper.connect默认为localhost:2181,能够修改成其余的zookeeper地址。多个地址间,经过逗号分隔,如:"127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"。
默认为9092端口,经过修改“listeners=PLAINTEXT://:9092” 来指定其余端口或IP。
配置好后,启动kafka server:apache
$ bin/kafka-server-start.sh config/server.properties
配置文件目录下还有consumer.properties和producer.properties,按默认便可。bootstrap
// 建立一个topic,topic名称为test浏览器
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
能够经过命令:bin/kafka-topics.sh --list --zookeeper localhost:2181查看当前全部的topic.缓存
// 经过producer发送消息网络
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test This is a message This is another message
往test中发送数据。session
// 启动一个consumer,拉取消息
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning This is a message This is another message
--from-beginning参数表示从头开始读取数据,若是不设置,则只读取最新的数据。
// 能够再启动一个Server
$ bin/kafka-server-start.sh config/server-1.properties &
这里的server-1.properties是拷贝的server.properties,主要修改以下几个参数:
# broker id,整数,和其余broker不能重复 broker.id=2 # 指定端口为9094,由于在同一台机器上,须要避免端口冲突。这里没有配置IP,默认为本机 listeners=PLAINTEXT://:9094 # 日志文件路径,即topic数据的存储位置。不一样的broker,指定不一样的路径。 log.dir=/tmp/kafka-logs-2
Producer1和Producer2往Topic A中发送消息,Consumer1/2/3/4/5 从Topic中接收消息。
Kafka Cluster包含两个Server,分别为Server1,Server2。
Topic A包含4个Partition,为:P0, P1, P3, P4,平均分配到Server1和Server2上。
一个Broker就是一个server。多个Broker构成一个kafka集群,同时对外提供服务,若是某个节点down掉,则从新分配。
注意:集群和主从热备不一样,对于主从热备,同时只有一个节点提供服务,其余节点待命状态。
消息发布者,Push方式,负责发布消息到Kafka broker。
消费者,Pull方式,消费消息。每一个consumer属于一个特定的consuer group。
经过对消息指定主题能够将消息分类,Consumer能够只关注特定Topic中的消息。
查看总共有多少个Topic:
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
查看某个topic的状况(分区、副本数等),这里查看topic为test的信息:
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
每一个Consumer都会归到一个Group中。某个Partition中的消息,能够被多个Group消费,但只能被Group中的一个Consumer消费。因此,若是要对多个Consumer进行消息广播,则这些Consumer须要放在不一样的Group中。
当一个Consumer进程或线程挂掉,它所订阅的Partition会被从新分配到该Group内的其余Consumer上。若是Consumer A订阅了多个Partition,那么当该Group内新增Consumer B时,会从Consumer A中分配出一个Partition给Consumer B。
为了维持Consumer 与 Consumer Group的关系,须要Consumer周期性的发送heartbeat到coordinator(协调者,在早期版本,以zookeeper做为协调者。后期版本则以某个broker做为协调者)。当Consumer因为某种缘由不能发Heartbeat到coordinator时,而且时间超过session.timeout.ms时,就会认为该consumer已退出,它所订阅的partition会分配到同一group 内的其它的consumer上。而这个过程,被称为rebalance。
Offset是针对Partition的,它用来记录消费到Partition中的哪条消息了。
Consumer并不维护Offset,而是由Consumer所在的Group维护。所以,Group中的一个Consumer消费了某个Partition中的消息,那么该组的其余Consumer就不能重复消费该条消息了,由于Offset已经+1了。
上图中,Consumer A和Consumer B属于不一样的Group。Consumer A所在的Group,在该Partition的Offset=9,表示下次该Group获取消息时是从9开始获取;同理,Consumer B所在的Group在该Partition的Offset=11,下次该Group的Consumer获取消息时,从11开始获取。
Partition是物理上的概念,每一个Partition对应一个文件夹(默认在/tmp/kafka-logs下,经过server.properties中log.dirs配置)。一个topic能够对应多个partition,consumer订阅的其实就是partition。
上图表示一个Topic,指定了3个分区。在向该Topic写数据时,会根据均衡策略,往相应的分区中写。这3个分区中的数据是不同的,它们的数据总和,构成该Topic的数据。
每一个分区中的数据,保证严格的写入顺序。
分区会自动根据均衡策略分配到多个broker上。好比有2个broker(或者叫Server):broker1, broker2,建立一个包含4个partition且replication-factor(副本)为1的topic,那么对于该topic,每一个broker会被分配2个partition。以下图:
有两个Group:Group A和Group B,其中Group A包含C一、C2两个Consumer;Group B包含C3,C4,C5,C6四个Consumer。
若是向该Topic写入4条信息:M1, M2, M3, M4。那么各个Consumer收到的消息是(一种状况):
C1:M1, M3 C2:M2, M4 C3:M1 C4:M3 C5:M2 C6:M4
C1,C2各接收到2条消息,它们的和为:M1,M2,M3,M4。
C3,C4,C5,C6各接收到1条消息,它们的和为:M1,M2,M3,M4。
代表Topic消息,被同一Group内的Consumer均分了。由于每次向Topic中写入消息时,会被均分至各个Partition,而后各Consumer收到本身所订阅Partition的消息。同时,这里也说明了同一个partition内的消息只能被同一个组中的一个consumer消费。
注:若是replication-factor为3,那么每一个broker会有6(即2x3)个partition。
另外,建立topic时,在当前的全部broker间进行均分,分好后就不会变了。假设把上述broker1停掉,它的分区不会转到broker2上。producer在写消息时,不会再写入broker2中的分区。
那么,原先订阅broker2分区的consumer,不能接收消息了。提示:
WARN [Consumer clientId=consumer-1, groupId=g4] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
对于一个Topic的partition数,增长Broker(即服务节点)并不会增长partition的数量。
验证:
查看topic信息
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
再启动一个新的Broker:
$ bin/kafka-server-start.sh ../config/server-1.properties
启动后,再用上一步的命令看topic信息,partition数量并未改变。
而且,若是group g1上有两个consumer,始终只会有一个consumer能收到该topic的消息,另外一个一直处于空闲状态(光占着资源不作事)。因此,Topic的Partition数,要大于等于Consumer数量。
默认组的疑问
可能读者会有疑问,经过命令:
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
执行屡次,建立了多个consumer,这些consumer属于默认的一个组,可是却能同时收到一个topic的信息。和上述所说的“一个Topic中的消息,只能被group中的一个consumer消费”有冲突。
其实,不指定group名称,的确会分配默认的group,但每次分配的名称是不同的,即这里建立的consumer是属于不一样的group的。能够经过命令查看全部group:
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list Note: This will not show information about old Zookeeper-based consumers. console-consumer-94070 console-consumer-27823 console-consumer-4826 console-consumer-47050
能够看出,这里的group名称是不同的。
consumer数量和group数量
对于一个topic,若是group中consumer数量比partition数量多,那么多余的consumer会空闲。这是由于,group中的某个consumer一旦订阅了某个partition,则会一直占用并消费该partition中的信息。除非该consumer退出,不然该partition不会被该组的其余consumer占用。因此会致使多余的consumer空闲,一直收不到消息。
能够经过命令,查看consumer和partition的对应关系:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group g4
一个topic能够对应多个partition,但一个partition只能对应一个topic。
Kafka解决查询效率的手段之一是将数据文件分段,好比有100条Message,它们的offset是从0到99。假设将数据文件分红5段,第一段为0-19,第二段为20-39,以此类推,每段放在一个单独的数据文件里面,数据文件以该段中最小的offset命名。这样在查找指定offset的Message的时候,用二分查找就能够定位到该Message在哪一个段中。
上图展现文件传输到Socket的常规方式,步骤:
上图展现零拷贝方式传输文件到Socket,少了文件缓存到用户空间,再由用户空间到内核空间的操做。
Kafka采用零拷贝的方式。
经过Replication Factor指定副本的数量,这样,若是一个Partition出现了问题,那么能够从副本中恢复了。
若是不喜欢经过命令行操做,也能够经过图形化管理界面,好比yahoo开源的Kafka Manager。
地址:https://github.com/yahoo/kafk...
这里以CentOS7为例,进行编译、运行说明。
注:Kafka Manager的编译须要javac,须要安装jdk环境。最新版的须要jdk8版本。
CentOS7默认安装了OpenJDK,将其卸载,从Oracle官网下载jdk8文件,而后安装。
// github上下载kafka manager源码
$ git clone https://github.com/yahoo/kafk...
$ cd kafka-manager
// 修改配置文件中zookeeper地址
配置文件:conf/application.conf
kafka-manager.zkhosts="127.0.0.1:2181"
若是有多个zookeeper,经过逗号分隔,如:
kafka-manager.zkhosts="my.zookeeper.host.com:2181,other.zookeeper.host.com:2181"
// 将源码编译打包成zip包
$ ./sbt clean dist
这一步用到了javac,运行完后,会在当前目录下生成target文件夹。生成zip包地址:
target/universal/kafka-manager-1.3.3.16.zip
// 进入zip所在目录,解压zip包,启动服务(默认9000端口)
$ cd target/universal
$ unzip kafka-manager-1.3.3.16
$ ./kafka-manager-1.3.3.16/bin/kafka-manager
// 打开Kafka Manager页面
浏览器输入地址:http://192.168.0.12:9000/ (这里的IP须要替换成读者本身的IP)
很简洁的一个页面,第一次打开,什么都没有。
// 添加一个Cluster
Cluster Name: 名称随意,好比MyFirstCluster
Cluster Zookeeper Hosts: zookeeper的地址,好比:192.168.0.12:2181
Kafka Version: 笔者选的0.11
勾选“Enable JMX Polling”。注意:勾选了该项,启动kafka server前,须要设置JMX_PORT变量,如:
$ JMX_PORT=9999 $ bin/zookeeper-server-start.sh config/zookeeper.properties
保存后,就能够经过MyFirstCluster,查看Broker, Topic, Partition, Consumer等信息了。
注:若是查看不了Consumer信息,提示“Please enable consumer polling here.”,须要勾选一个配置。如:
提示信息:
修改Cluster:
勾选中“Poll consumer information”
保存。具体的管理功能,能够经过操做页面进一步挖掘。