kafka分布式消息平台的初探

介绍

Kafka是一个分布式的流数据平台,可发布、订阅消息流,使用zookeeper进行集群管理。也可做为一个消息队列中间件,相似于RabbitMQ,ActiveMQ,ZeroMQ等。由LinkdIn开源,用Scala语言实现。
Kafka有以下特色:java

  • kafka利用线性存储来进行硬盘读写,速度快;
  • 以时间复杂度为O(1)的方式提供消息持久化能力,即便对TB级以上数据也能保证常数时间的访问性能。所以不清除存储的数据并不会影响性能;
  • zero-copy Gzip和Snappy压缩
  • 已消费的消息不会自动删除
  • 考虑到高效性,对事务的支持较弱。

应用场景

图片描述

安装使用

// 从官网下载最新版本,这里为:kafka_2.11-1.0.0.tgznode

// 解压git

$ tar -xzf kafka_2.11-1.0.0.tgz
$ cd kafka_2.11-1.0.0

// Kafka用到了zookeeper,因此须要启动zookeeper(新版本内置了zookeeper,若是读者已有其余zookeeper启动了,这步能够略过)github

$ bin/zookeeper-server-start.sh config/zookeeper.properties

// 修改配置文件,并启动kafka server:
config/server.properties中的zookeeper.connect默认为localhost:2181,能够修改成其余的zookeeper地址。多个地址间,经过逗号分隔,如:"127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"。
默认为9092端口,经过修改“listeners=PLAINTEXT://:9092” 来指定其余端口或IP。
配置好后,启动kafka server:apache

$ bin/kafka-server-start.sh config/server.properties

配置文件目录下还有consumer.properties和producer.properties,按默认便可。bootstrap

// 建立一个topic,topic名称为test浏览器

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

能够经过命令:bin/kafka-topics.sh --list --zookeeper localhost:2181查看当前全部的topic.缓存

// 经过producer发送消息网络

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
This is a message
This is another message

往test中发送数据。session

// 启动一个consumer,拉取消息

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

--from-beginning参数表示从头开始读取数据,若是不设置,则只读取最新的数据。

// 能够再启动一个Server

$ bin/kafka-server-start.sh config/server-1.properties &

这里的server-1.properties是拷贝的server.properties,主要修改以下几个参数:

# broker id,整数,和其余broker不能重复
broker.id=2
# 指定端口为9094,由于在同一台机器上,须要避免端口冲突。这里没有配置IP,默认为本机
listeners=PLAINTEXT://:9094
# 日志文件路径,即topic数据的存储位置。不一样的broker,指定不一样的路径。
log.dir=/tmp/kafka-logs-2

示意图

图片描述

Producer1和Producer2往Topic A中发送消息,Consumer1/2/3/4/5 从Topic中接收消息。
Kafka Cluster包含两个Server,分别为Server1,Server2。
Topic A包含4个Partition,为:P0, P1, P3, P4,平均分配到Server1和Server2上。

Broker

一个Broker就是一个server。多个Broker构成一个kafka集群,同时对外提供服务,若是某个节点down掉,则从新分配。
注意:集群和主从热备不一样,对于主从热备,同时只有一个节点提供服务,其余节点待命状态。

Producer

消息发布者,Push方式,负责发布消息到Kafka broker。

Consumer

消费者,Pull方式,消费消息。每一个consumer属于一个特定的consuer group。

主题(Topic)

经过对消息指定主题能够将消息分类,Consumer能够只关注特定Topic中的消息。
查看总共有多少个Topic:

$ bin/kafka-topics.sh --list --zookeeper localhost:2181

查看某个topic的状况(分区、副本数等),这里查看topic为test的信息:

$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

消费者组(Consumer Group)

每一个Consumer都会归到一个Group中。某个Partition中的消息,能够被多个Group消费,但只能被Group中的一个Consumer消费。因此,若是要对多个Consumer进行消息广播,则这些Consumer须要放在不一样的Group中。
当一个Consumer进程或线程挂掉,它所订阅的Partition会被从新分配到该Group内的其余Consumer上。若是Consumer A订阅了多个Partition,那么当该Group内新增Consumer B时,会从Consumer A中分配出一个Partition给Consumer B。

为了维持Consumer 与 Consumer Group的关系,须要Consumer周期性的发送heartbeat到coordinator(协调者,在早期版本,以zookeeper做为协调者。后期版本则以某个broker做为协调者)。当Consumer因为某种缘由不能发Heartbeat到coordinator时,而且时间超过session.timeout.ms时,就会认为该consumer已退出,它所订阅的partition会分配到同一group 内的其它的consumer上。而这个过程,被称为rebalance。

位移(Offset)

Offset是针对Partition的,它用来记录消费到Partition中的哪条消息了。
Consumer并不维护Offset,而是由Consumer所在的Group维护。所以,Group中的一个Consumer消费了某个Partition中的消息,那么该组的其余Consumer就不能重复消费该条消息了,由于Offset已经+1了。
图片描述

上图中,Consumer A和Consumer B属于不一样的Group。Consumer A所在的Group,在该Partition的Offset=9,表示下次该Group获取消息时是从9开始获取;同理,Consumer B所在的Group在该Partition的Offset=11,下次该Group的Consumer获取消息时,从11开始获取。

分区(Partition)

Partition是物理上的概念,每一个Partition对应一个文件夹(默认在/tmp/kafka-logs下,经过server.properties中log.dirs配置)。一个topic能够对应多个partition,consumer订阅的其实就是partition。
图片描述

上图表示一个Topic,指定了3个分区。在向该Topic写数据时,会根据均衡策略,往相应的分区中写。这3个分区中的数据是不同的,它们的数据总和,构成该Topic的数据。
每一个分区中的数据,保证严格的写入顺序。

分区会自动根据均衡策略分配到多个broker上。好比有2个broker(或者叫Server):broker1, broker2,建立一个包含4个partition且replication-factor(副本)为1的topic,那么对于该topic,每一个broker会被分配2个partition。以下图:
图片描述

有两个Group:Group A和Group B,其中Group A包含C一、C2两个Consumer;Group B包含C3,C4,C5,C6四个Consumer。
若是向该Topic写入4条信息:M1, M2, M3, M4。那么各个Consumer收到的消息是(一种状况):

C1:M1, M3
C2:M2, M4

C3:M1
C4:M3
C5:M2
C6:M4

C1,C2各接收到2条消息,它们的和为:M1,M2,M3,M4。
C3,C4,C5,C6各接收到1条消息,它们的和为:M1,M2,M3,M4。
代表Topic消息,被同一Group内的Consumer均分了。由于每次向Topic中写入消息时,会被均分至各个Partition,而后各Consumer收到本身所订阅Partition的消息。同时,这里也说明了同一个partition内的消息只能被同一个组中的一个consumer消费
注:若是replication-factor为3,那么每一个broker会有6(即2x3)个partition。

另外,建立topic时,在当前的全部broker间进行均分,分好后就不会变了。假设把上述broker1停掉,它的分区不会转到broker2上。producer在写消息时,不会再写入broker2中的分区。
那么,原先订阅broker2分区的consumer,不能接收消息了。提示:

WARN [Consumer clientId=consumer-1, groupId=g4] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

对于一个Topic的partition数,增长Broker(即服务节点)并不会增长partition的数量。
验证:
查看topic信息

$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

再启动一个新的Broker:

$ bin/kafka-server-start.sh ../config/server-1.properties

启动后,再用上一步的命令看topic信息,partition数量并未改变。
而且,若是group g1上有两个consumer,始终只会有一个consumer能收到该topic的消息,另外一个一直处于空闲状态(光占着资源不作事)。因此,Topic的Partition数,要大于等于Consumer数量。

默认组的疑问
可能读者会有疑问,经过命令:

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

执行屡次,建立了多个consumer,这些consumer属于默认的一个组,可是却能同时收到一个topic的信息。和上述所说的“一个Topic中的消息,只能被group中的一个consumer消费”有冲突。
其实,不指定group名称,的确会分配默认的group,但每次分配的名称是不同的,即这里建立的consumer是属于不一样的group的。能够经过命令查看全部group:

$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
Note: This will not show information about old Zookeeper-based consumers.

console-consumer-94070
console-consumer-27823
console-consumer-4826
console-consumer-47050

能够看出,这里的group名称是不同的。

consumer数量和group数量
对于一个topic,若是group中consumer数量比partition数量多,那么多余的consumer会空闲。这是由于,group中的某个consumer一旦订阅了某个partition,则会一直占用并消费该partition中的信息。除非该consumer退出,不然该partition不会被该组的其余consumer占用。因此会致使多余的consumer空闲,一直收不到消息。
能够经过命令,查看consumer和partition的对应关系:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group g4

一个topic能够对应多个partition,但一个partition只能对应一个topic。

数据文件分段

Kafka解决查询效率的手段之一是将数据文件分段,好比有100条Message,它们的offset是从0到99。假设将数据文件分红5段,第一段为0-19,第二段为20-39,以此类推,每段放在一个单独的数据文件里面,数据文件以该段中最小的offset命名。这样在查找指定offset的Message的时候,用二分查找就能够定位到该Message在哪一个段中。
图片描述

零拷贝(图来自网络)

图片描述

上图展现文件传输到Socket的常规方式,步骤:

  1. 操做系统将文件数据从磁盘读入到内核空间的页缓存;
  2. 应用程序将数据从内核空间读入到用户空间缓存中;
  3. 应用程序将数据写回到内核空间到socket缓存中;
  4. 操做系统将数据从socket缓冲区复制到网卡缓冲区,以便将数据经网络发出。

图片描述

上图展现零拷贝方式传输文件到Socket,少了文件缓存到用户空间,再由用户空间到内核空间的操做。
Kafka采用零拷贝的方式。

Partition备份

经过Replication Factor指定副本的数量,这样,若是一个Partition出现了问题,那么能够从副本中恢复了。

Kafka Manager安装和使用

若是不喜欢经过命令行操做,也能够经过图形化管理界面,好比yahoo开源的Kafka Manager。
地址:https://github.com/yahoo/kafk...
这里以CentOS7为例,进行编译、运行说明。
注:Kafka Manager的编译须要javac,须要安装jdk环境。最新版的须要jdk8版本。
CentOS7默认安装了OpenJDK,将其卸载,从Oracle官网下载jdk8文件,而后安装。

// github上下载kafka manager源码
$ git clone https://github.com/yahoo/kafk...
$ cd kafka-manager

// 修改配置文件中zookeeper地址
配置文件:conf/application.conf

kafka-manager.zkhosts="127.0.0.1:2181"

若是有多个zookeeper,经过逗号分隔,如:

kafka-manager.zkhosts="my.zookeeper.host.com:2181,other.zookeeper.host.com:2181"

// 将源码编译打包成zip包
$ ./sbt clean dist
这一步用到了javac,运行完后,会在当前目录下生成target文件夹。生成zip包地址:
target/universal/kafka-manager-1.3.3.16.zip

// 进入zip所在目录,解压zip包,启动服务(默认9000端口)
$ cd target/universal
$ unzip kafka-manager-1.3.3.16
$ ./kafka-manager-1.3.3.16/bin/kafka-manager

// 打开Kafka Manager页面
浏览器输入地址:http://192.168.0.12:9000/ (这里的IP须要替换成读者本身的IP)
图片描述

很简洁的一个页面,第一次打开,什么都没有。

// 添加一个Cluster
图片描述

Cluster Name: 名称随意,好比MyFirstCluster
Cluster Zookeeper Hosts: zookeeper的地址,好比:192.168.0.12:2181
Kafka Version: 笔者选的0.11
勾选“Enable JMX Polling”。注意:勾选了该项,启动kafka server前,须要设置JMX_PORT变量,如:
$ JMX_PORT=9999
$ bin/zookeeper-server-start.sh config/zookeeper.properties

保存后,就能够经过MyFirstCluster,查看Broker, Topic, Partition, Consumer等信息了。
注:若是查看不了Consumer信息,提示“Please enable consumer polling here.”,须要勾选一个配置。如:
提示信息:
图片描述

修改Cluster:
图片描述

勾选中“Poll consumer information”
图片描述

保存。具体的管理功能,能够经过操做页面进一步挖掘。

相关文章
相关标签/搜索