【Spark深刻学习 -15】Spark Streaming前奏-Kafka初体验

----本节内容-------html

1.Kafka基础概念java

 1.1 出世背景node

 1.2 基本原理python

      1.2.1.前置知识linux

      1.2.2.架构和原理算法

      1.2.3.基本概念数据库

      1.2.4.kafka特色apache

2.Kafka初体验缓存

  2.1 环境准备安全

  2.2 Kafka小试牛刀

      2.2.1单个broker初体验

      2.2.2 多个broker初体验

  2.3 Kafka分布式集群构建

     2.3.1 Kafka分布式集群构建

     2.3.2 Kafka主题建立

     2.3.3 生产者生产数据

     2.3.4消费者消费数据

     2.3.5消息的压缩

2.4 Kafka在ZK目录节点

    2.4.1 kafka镜像原理

    2.4.2 Kafka副本模型

    2.4.3 在ZK目录节点内容

 2.5 实体间交互流程

   2.5.1主题与zk  

   2.5.2 消费者与zk

   2.5.3 broker与生产者

   2.5.4 消费者与消费者组

3.参考资料

---------------------

1.Kafka基础概念

1.1 出世背景

Kafka是一个消息系统,是LinkedIn公司开发并开源出来的组件。Kafka本来用做LinkedIn的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础。如今它已被多家公司做为多种类型的数据管道和消息系统使用。歪果仁就喜欢整一些看不懂的词汇,好比活动流,好比运营数据,就不能好好说话么。

活动流是啥?简单理解,就是用户使用网站或者系统时产生的数据流,好比点击一个页面,查看一个图片,翻看一网页,搜索一个关键字,网站运营者须要对用户的这些行为进行统计,造成报表。

运营数据是啥?就是计算机产生的监控日志信息,如CPU数据,IO数据等, 这些数据都是动态生成的,Linkein这样的大公司,运营数据量很是大,一般的方式是生成这些数据,写入到log文件中,而后进行统计。活动流数据和运营数据对网站和软件产品很是重要,举几个栗子

  1)动态汇总,将用户的信息动态汇总,或者本身监控,或者发给用户的朋友圈

  2) 安全,实时监控用户访问信息,防止网络爬虫或者用户扩散垃圾信息,对API使用速率进行实时监控和控制,切断网站某些不正常活动。

 3)机器硬件实时监控:对机器运行效率实时监控,对异常状况自动触发告警。

 4)报表和批处理:将数据导入Hadoop平台,进行离线报表分析。

LinkedIn处理的时候就碰到几个问题:

  1) 日志量大,天天要处理10亿多条数据。

   2)高吞吐量。

  3)实时性能差。

现有的消息队列系统(messaging and queuing system)却很适合于在实时或近实时(near-real-time)的状况下使用,但它们对很长的未被处理的消息队列的处理很不给力,每每并不将数 据持久化做为首要的事情考虑。这样就会形成一种状况,就是当把大量数据传送给Hadoop这样的离线系统后, 这些离线系统每一个小时或天天仅能处理掉部分源数据。Kafka的目的就是要成为一个队列平台,仅仅使用它就可以既支持离线又支持在线使用这两种状况。

1.2 基本架构和原理

1.2.1.前置知识

消息队列

   为何要引入消息队列?举个例子,假如A发送消息给B,若是B在线,那么能够很顺利的通信发消息,那若是B不在线,那就比较麻烦了,消息队列技术能够很好的解决这个问题。

   消息队列技术是分布式应用间交换信息的一种技术,是两个系统通信的桥梁和媒介,将两个系统解耦,不须要知道对方的位置和信息。 经过消息队列技术2个异构的系统能够进行通信,尤为是大型系统。消息队列能够保存在磁盘或者内存中。

   消息队列技术底层都是socket通信,socket在不少地方有用到,好比数据库,进程间通信,jdbc等等底层都是socket通信。

   JMS是消息服务的规范,不少消息中间件技术都遵循JMS规范。

消息队列通信模式

1)点对点通信:点对点方式是最为传统和常见的通信方式,它支持一对1、一对多、多对多、多对一等多种配置方式,支持树状、网状等多种拓扑结构。用人话描述一遍:就是一我的放东西,一我的去东西,这就是点对点。

2)发布/订阅 (Publish/Subscribe) 模式:发布/订阅功能使消息的分发能够突破目的队列地理指向的限制,使消息按照特定的主题甚至内容进行分发,用户或应用程序能够根据主题或内容接收到所须要的消息。发布/订阅功能使得发送者和接收者之间的耦合关系变得更为松散,发送者没必要关心接收者的目的地址,而接收者也没必要关心消息的发送地址,而只是根据消息的主题进行消息的收发。发布/订阅 模式:就跟贴寻人启事,公告同样的道理,消息往公告上一贴,关心的人就去看看发生什么事,不关心的就看成一堆废纸,不用搭理。

Kafka很厉害,kafka将这两个概念整合到一块儿,他只有主题模式,可是能实现队列模式的效果,实现方式就是消费者组的引入:

消费者组:将一个或者多个消费者划到一块儿,取一个名标记,这就是消费者组,对于一个消费者组,

,处在同一个消费者组的消费者,只能有一个消费者消费,

发布订阅模式:每一个消费者组,只有一个消费者,那就是发布订阅,每一个消费者都有本身的组。

每一个组都能消费,那就是发布订阅模式。

队列模式: 全部的消费者都在一个组里面。

消息队列特色

 ·数据缓冲做用

· 下降耦合

·异构系统高效交互

Kafka是一个消息队列组件,它遵循JMS规范,基本工做流程,生产者生产数据-> kafka集群中转数据->消费者消费数据

 

1.2.2.架构和原理

 

 

 生产者生产消息,将消息发送给Kafka集群,Kafka内在是分布式的,一个Kafka集群一般包括多个代理。为了均衡负载,将话题分红多个分区,每一个代理存储一或多个分区。消费者从kafka主题中获取消息。多个生产者和消费者可以同时生产和获取消息。

1.Producer根据指定partition方法(round-robin、hash等),将消息发布到指定topic的partition里面

2.kafka集群接收到Producer发过来的消息后,将其持久化到硬盘,并保留消息指定时长(可配置),而不关注消息是否被消费。

3.Consumer从kafka集群pull数据,并控制获取消息的offset

1.2.3.基本概念

Broker:Kafka 集群包含一个或多个服务器,这种服务器被称为 broker。

Topic:每条发布到 Kafka 集群的消息都有一个类别,这个类别被称为 Topic。(物理上不一样 Topic 的消息分开存储,逻辑上一个 Topic 的消息虽然保存于一个或多个 broker 上,但用户只需指定消息的 Topic 便可生产或消费数据而没必要关心数据存于何处)。

Partition:Partition 是物理上的概念,每一个 Topic 包含一个或多个 Partition。

Producer:负责发布消息到 Kafka broker。

Consumer:消息消费者,向 Kafka broker 读取消息的客户端。

Consumer Group:每一个 Consumer 属于一个特定的 Consumer Group(可为每一个 Consumer 指定 group name,若不指定 group name 则属于默认的 group)。

1.2.4.Kafka特色

1)分布式流平台,支持消息的分区(mr的分区相似),支持多个服务器之间消息分区

2)支持发布和订阅数据流,相似于消息系统

3)支持分布式和副本集群方式,来存储数据流

4)实时处理数据流

5) 支持多种源数据,数据库交互、app双向交互

6)水平可伸缩

7) 容错好

8)速度快

9)多种方式存储,持久化存储内存,磁盘秒级

10)海量数据,TB级高吞吐量:支持每秒百万消息,·廉价硬件

11)多客户端支持,很容集成不一样平台,java,python,和多源进行协同,它是一个

12)中间件的基因,跨平台和跨语言,开源

2.Kafka初体验

2.1环境准备

1)kafka下载

kafka2.1.2官网下载:https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.2.1/kafka_2.12-0.10.2.1.tgz

2) zookeeper下载

zookeeper3.3.6官网地址:http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.3.6/zookeeper-3.3.6.tar.gz

3) jdk下载

kafka和zookeeper前提是安装好了jdk,注意你电脑是32位仍是64为

jdk官网下载地址:http://download.oracle.com/otn-pub/java/jdk/8u131-b11/d54c1d3a095b4ff2b6607d096fa80163/jdk-8u131-linux-x64.tar.gz

4)关闭防火墙

不然zk启动会报错no route tohost

· 查看防火墙状态,使用root帐号执行

service iptables status

·关闭防火墙

service iptables stop

·查看防火墙开机启动状态

chkconfig iptables --list

·关闭防火墙开机启动

chkconfig iptables off

5)配置好host

2.2 Kafka小试牛刀

2.2.1 单个broker初体验

1.安装单节点的kafka

下载下来了以后直接解压就能够运行单节点的Kafka,由于Kafka须要用zookeeper作高可用,若是没有安装zk,也没有关系,使用它自带的配置启动就能够。

1)启动自带配置的zk

启动zk命令:  bin/zookeeper-server-start.sh config/zookeeper.properties

zookeeper启动成功

2)启动kafka

kafka启动:bin/kafka-server.start.sh config/server.properites

kafka启动成功

3)建立一个topic

建立topic:bin/kafka-topics.sh --create --zookeeper kafka01:2181 --replication-factor 1 --partitions 1 --topic test

查看topic:bin/kafka-topicts.sh --list --zookeeper kafka01:2181

4)启动生产者

命令:bin/kafka-console-producer.sh --broker-list kafka:9092 --topic test

启动以后,输入2行

5)启动消费者

命令:bin/kafka-console-consumer.sh --zookeeper kafka01:9202 --topic test --from-beginning

消费者接收到2行消息

总结

1.启动zk

命令: bin/zookeeper-server-start.sh config/zookeeper.properties

2.启动kafka

命令:bin/kafka-server.start.sh config/server.properites

参数:指定kafka的配置文件

3.建立主题

命令:bin/kafka-topics.sh --create --zookeeper kafka01:2181 --replication-factor 1 --partitions 1 --topic test

参数:1)create:指定建立动做,2)zookeeper:指定zookeeper客户端;3)replication-factor:指定主题副本个数;4)partitions:指定分区个数;5)topic:指定主题名称

查看topic:bin/kafka-topicts.sh --list --zookeeper kafka01:2181

4.启动生产者

命令:bin/kafka-console-producer.sh --broker-list kafka01:9092 --topic test

参数:1) broker-list:指定broker;2)topic:指定主题名,从参数能够看出来,生产者是不直接和zk交互的

5.启动消费者

命令:bin/kafka-console-consumer.sh --zookeeper kafka01:2181 --topic test --from-beginning

参数:1)zookeeper:指定zookeeper客户端;2)topic:指定主题名,3)from-beginning:从topic头开始读取消息

2.2.2 多个broker初体验

前面已经配置了一个单节点kafka服务,再次扩展演示kafka集群多节点可用性、容错性,也为kafka分布式集群作好铺垫。

1)配置kafka多节点服务

配置多节点服务,拷贝出2分配置server1.properties,server2.properties,修改2处参数

------------------------------server1.properties

broker.id=1

listeners=PLAINTEXT://:9092

log.dirs=/tmp/kafka1-logs

------------------------------server2.properties

broker.id=2

listeners=PLAINTEXT://:9093

log.dirs=/tmp/kafka2-logs

-----------------------------

2)启动zookeeper服务

bin/zookeeper-server-start.sh config/zookeeper.properties

3)启动2个kafka服务

bin/kafka-server-start.sh config/server1.properties

bin/kafka-server-start.sh config/server2.properties

4)建立topic主题

bin/kafka-topics.sh --create --zookeeper kafka01:2181 -replication-factor 1 --partitions 1 --test2

5)启动生产者

bin/kafka-console-producer.sh --broker-list kafka02:9093 --topic test2

输入内容

6)启动消费者

bin/kafka-console-consumer.sh --zookeeper kafka01:2181 --topitc test2 --from-beginning

2) 测试多节点可用性

在生产者输入内容,消费者端能够获取到消息

生产者端:

消费者端:

 

3)测试多节点容错性

杀掉一个kafka服务,而后发送消息测试,消费者是否能政策收到消息

 

2.3 Kafka分布式集群构建

2.3.1 Kafka分布式集群构建

1.配置zookeeper集群

1).解压后,配置zoo.cfg,若是没有从模板配置文件中拷贝出来

官网建议使用zookeeper3.4.x,3.4.9

http://kafka.apache.org/documentation.html#zk

这里有各类版本的下载地址

https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/

咱们须要的版本3.4.9

https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.4.9/zookeeper-3.4.9.tar.gz

配置以前建立目录

命令:mkdir -p /usr/local/hadoop/zookeeper/data /usr/local/hadoop/zookeeper/log

修改zoo.cfg配置

命令:vi /usr/local/hadoop/zookeeper/zookeeper-3.4.5-cdh5.4.5/conf/zoo.cfg

内容:

----------------------------

tickTime=2000

initLimit=10

syncLimit=5

dataDir=/usr/local/hadoop/zookeeper/data

dataLogDir=/usr/local/hadoop/zookeeper/log

clientPort=2181

server.1=kafka01:2287:3387

server.2=kafka02:2288:3387

server.3=kafka03:2289:3387

----------------------------

b.配置myid(在每一个节点上都这样配置)

还有一个关键的设置,在每一个zk server配置文件的dataDir所对应的目录下,必须建立一个名为myid的文件,其中的内容必须与zoo.cfg中server.x 中的x相同,即:

-----------------------

/usr/local/hadoop/zookeeper/data/myid 中的内容为1,对应server.1中的1

/usr/local/hadoop/zookeeper/data/myid 中的内容为2,对应server.2中的2

/usr/local/hadoop/zookeeper/data/myid 中的内容为3,对应server.3中的3

 -----------------------

c.关闭防火墙,而且各节点要配置好jdk

-------------

service iptables stop

chkconfig iptables off

-------------

d.启动zookeeper

/usr/local/hadoop/zookeeper/zookeeper-3.4.5-cdh5.4.5/bin/zkServer.sh start

e.验证服务

命令1:bin/zkServer.sh status

命令2:bin/zkServer.sh start

f.配置环境变量到/etc/profile

ZOOKEEPER_HOME=/usr/local/hadoop/zookeeper/zookeeper-3.3.6

PATH=$ZOOKEEPER_HOME/bin:$PATH

export PATH

KAFKA_HOME=/usr/local/hadoop/kafka/kafka_2.12-0.10.2.1

PATH=$ZOOKEEPER_HOME/bin:$PATH

export PATH

2.配置kafka集群

1)准备目录

命令:mkdir -p /usr/local/hadoop/kafka/log

2)修改server.properties 3个参数,

每一个节点的broker.id不同,本次实验kafka01,kafka02,kafka03对应1,2,3

------------

broker.id=03

log.dirs=/usr/local/hadoop/kafka/log

zookeeper.connect=kafka01:2181,kafka02:2181,kafka03:2181

------------

3)启动kafka集群

每个节点执行,

命令:bin/kafka-server-start.sh config/server.properties

4)验证kafka集群是否正常启动

a.jps查看进程是否启动,正常启动,会有Kafka进程服务

b.查看/usr/local/hadoop/kafka/log下面是否有数据

c.查看zk中是否有kafka目录

 

2.3.2 Kafka主题建立

1)相关命令

建立命令

bin/kafka-topics.sh --create --zookeeper kafka01:2181 -replication-factor 3 --partitions 2 --topic test3

查看命令

bin/kafka-topics.sh --list  --zookeeper kafka01:2181

查看分区个数、副本个数

bin/kafka-topics.sh --describe --zookeeper kafka01:2181 --topic test

2)相关说明

从这里能够看到不少关于主题的信息,总要包含:

· 主题的leader:读写都从这里进行随机选择

· 主题的副本数:副本数,节点列表

· isr:同步复制

·主题的分区数:2个分区

zk保留了副本之间的leader和随从信息,每一个副本周期性同步到磁盘

1.每一个分区有N个副本,能够承受N-1个节点故障,ZK承受N-1/2个故障,若是3个节点,挂了2个,那就不行了。每一个副本都有本身的leader,其他的都是follower,zk存放分区的leader和all replica的信息

2.每一个副本存储消息的部分数据在本地的log和offset中,周期性同步到磁盘中去确保消息写入所有

副本或者其中一个

3.leader故障时,消息或者写入本地log,或则在producer在收到ack消息前,重新发送消息给新的leader

这些信息都是保留在zookeeper中的。进一步去zookeeper观察

看到有2个topic

进入到0的目录下查看

再看看kafka的log目录,实际的数据是保存在kafka的log目录下,虽然尚未写数据,可是相关目录已经准备好了相关存放文件和目录了。

 

2.3.3 生产者生产数据

1)相关命令

bin/kafka-console-producer.sh --broker-list kafka02:9092 --topic test2

这里要注意端口要和配置文件的保持一致,笔者由于前面演示了单机版的多broker(9093端口),端口没有改正,致使消费者无法消费数据,白白浪费了不少时间排查问题。

2)相关说明

从命令能够看出,生产者是和broker直接交互,broker使用zk协同工具来管理多个broker

broker:broker不知道谁消费了消息,并不维护哪一个消费者消费了消息

消费者组:每一个组中只有一个消费者能够消费消息(全部的消费者都在一个组》队列模式,都有本身的组》订阅模式),经过消费者组赞成了

消费者:维护了消费消息的状态,broker不知道谁消费了消息,并不维护哪一个消费者消费了消息,消费者本身知道的。

2.2.4 消费者消费数据

1)相关命令

bin/kafka-console-consumer.sh --zookeeper kafka02:2181 --topic test2 --from-beginning

2)相关说明

1.消息缓存与filesystem的存储,数据是当即被即刻写入OS的内核页而且缓存以及清理磁盘(能够配置)

2.消息被消费后,kafka能长时间驻留消息在服务器,容许重复消费

3.对分组消息使用了消息set,防止网络过载

4.在服务器存放消费的信息,kafka是在消费者端存放,消费者保持消息的状态

5.消费者状态默认是在zk中,也容许存到到其余OLTP,好比数据库

6.Kafka中生产和消费是点心的pull-push

生产者pull(write,输入流),消费者push(read,输出流,拉)

7.没有主从模式,全部的broker的地位相同,broker数据均在zk中维护

并在producer之间共享

8.负载均衡策略,loadbalance,容许producer动态发现broker

9.producer生产者维护了一个broker链接池,并能经过zk的callback进行实时更新

10.producer能够选择同步或者异步的方式发送消息给broker

打电话:同步,阻塞的都是同步的,NIO的特色就是非阻塞,IO就是阻塞的

发短息:异步,你收不收,知不知道,我无论,我先去干其余的事情

2.2.5  消息的压缩

Kafka设计的初衷是迅速处理短小的消息,通常10K大小的消息吞吐性能最好(可参见LinkedIn的kafka性能测试)。但有时候,咱们须要处理更大的消息,好比XML文档或JSON内容,一个消息差很少有10-100M,这种状况下,Kakfa应该如何处理?

kafka producer设置compression.type=snappy

2.4 Kafka在ZK目录节点

2.4.1  kafka镜像原理

将元集群的数据副本化给target kafka集群,目标kafka集群就看成一个消费者消费数据实现数据的备份。

2.4.2  Kafka副本模型

同步模型(同步复制):生产者从zk找leader,并发送message,消息当即写入本地log,follow开始拉取消息,每一个follow将消息写入各自本地的log,向leader发送确认回执。leader在收到全部的follow的确认回执和本地副本写入工做均完成后,再向producer发送确认回执。生产者客户端是阻塞的,消费者的数据pull从leader中完成。

 

异步模型:leader的本地log写入完成立刻向生产者发送回执,leader不等待follow的回执,follow行不行,成不成功,无论。

2.4.3 在ZK目录节点内容

/brokers/topics/topic:存储某个topic的partitions全部分配信息

/brokers/topics/[topic]/partitions/[0...N]:partitions状态信息

/brokers/ids/[0...N]:每一个broker的配置文件中都须要指定一个数字类型的id(全局不可重复),此节点为临时znode(EPHEMERAL).

/controller_epoch -> int (epoch) :此值为一个数字,kafka集群中第一个broker第一次启动时为1,之后只要集群中center controller中央控制器所在broker变动或挂掉,就会从新选举新的center controller,每次center controller变动controller_epoch值就会 + 1; 

/controller -> int (broker id of the controller) :存储center controller中央控制器所在kafka broker的信息

 

2.5 实体间交互流程

zookeeper在kafka扮演重要角色,Kafka使用zookeeper做为其分布式协调框架,很好的将消息生产、消息存储、消息消费的过程结合在一块儿。同时借助zookeeper,kafka可以生产者、消费者和broker在内的因此组件在无状态的状况下,创建起生产者和消费者的订阅关系,并实现生产者与消费者的负载均衡。

2.5.1主题与zk  

在kafka中,用户能够自定义多个topic,每一个topic又能够划分为多个分区,一半状况下,每一个分区存储在一个独立的broker上。全部这些topic与broker的对应关系都有zookeeper进行维护。

在zookeeper中,创建专门的节点来记录这些信息,其节点路径为/brokers/topics/{topic_name},如:

[zk: localhost:2181(CONNECTED) 6] ls /brokers/topics

[toptic_t, test, my-replicated-topic, mykafka, mykafka6, mykafka5, mykafka4, test6, mykafka3, test7, mykafka2]

[zk: localhost:2181(CONNECTED) 17] get /brokers/topics/mykafka4  

{"version":1,"partitions":{"1":[102,103,104],"2":[103,104,102],"0":[104,102,103]}}

针对topic 的每个分区与broker的对应关系,zookeeper经过节点 /brokers/topics/topic.name来记录,如:

当broker启动时,会到对应topic节点下注册本身的broker.id到对应分区的isr列表中,如:

[zk: localhost:2181(CONNECTED) 23] get /brokers/topics/mykafka4/partitions/1/state

{"controller_epoch":15,"leader":102,"version":1,"leader_epoch":2,"isr":[102,103,104]}

一样的,当broker退出数,也会触发zookeeper更新其对应topic分区的isr列表,并决定是否须要作消费者的负载均衡。

2.5.2 消费者与zk

l 注册新的消费者分组

当新的消费者组注册到zookeeper中时,zookeeper会建立专用的节点来保存相关信息,其节点路径为ls /consumers/{group_id},其节点下有三个子节点,分别为[ids, owners, offsets]。

Ø ids节点:记录该消费组中当前正在消费的消费者;

Ø owners节点:记录该消费组消费的topic信息;

Ø offsets节点:记录每一个topic的每一个分区的offset,如:

[zk: localhost:2181(CONNECTED) 54] get /consumers/test-consumer2-group/offsets/mykafka4/0

142

l 注册新的消费者

当新的消费者注册到kafka中时,会在/consumers/{group_id}/ids节点下建立临时子节点,并记录相关信息,如:

[zk: localhost:2181(CONNECTED) 57] ls /consumers/test-consumer2-group/ids/test-consumer2-group_dev103-1433562901087-7b517b97

[]

[zk: localhost:2181(CONNECTED) 58] get /consumers/test-consumer2-group/ids/test-consumer2-group_dev103-1433562901087-7b517b97

{"version":1,"subscription":{"mykafka5":1},"pattern":"white_list","timestamp":"1433562901290"}

 

l 监听消费者分组中消费者的变化

每一个消费者都要关注其所属消费者组中消费者数目的变化,即监听/consumers/{group_id}/ids下子节点的变化。一单发现消费者新增或减小,就会触发消费者的负载均衡。

2.5.3 broker与与zk

为了记录broker的注册信息,在zookeeper上,专门建立了属于kafka的一个节点,其路径为/brokers,如:

 [zk: localhost:2181(CONNECTED) 1] ls /brokers

[ids, topics]

Kafka的每一个broker启动时,都会到zookeeper中进行注册,告诉zookeeper其broker.id, 在整个集群中,broker.id应该全局惟一,并在zookeeper上建立其属于本身的节点,其节点路径为/brokers/ids/{broker.id}. 如:

 [zk: localhost:2181(CONNECTED) 2] ls /brokers/ids

[102, 103]

建立完节点后,kafka会将该broker的broker.name及端口号记录到改节点,如

[zk: localhost:2181(CONNECTED) 4] get /brokers/ids/102

{"jmx_port":-1,"timestamp":"1433209686575","host":"host102","version":1,"port":9092}

另外,改broker节点属性为临时节点,当broker会话失效时,zookeeper会删除该节点,这样,咱们就能够很方便的监控到broker节点的变化,及时调整负载均衡等。

 

2.5.4 消费者与消费者组

a.每一个consumer客户端被建立时,会向zookeeper注册本身的信息;

b.此做用主要是为了"负载均衡".

c.同一个Consumer Group中的Consumers,Kafka将相应Topic中的每一个消息只发送给其中一个Consumer。

d.Consumer Group中的每一个Consumer读取Topic的一个或多个Partitions,而且是惟一的Consumer;

e.一个Consumer group的多个consumer的全部线程依次有序地消费一个topic的全部partitions,若是Consumer group中全部consumer总线程大于partitions数量,则会出现空闲状况;

举例说明:

kafka集群中建立一个topic为report-log   4 partitions 索引编号为0,1,2,3

假若有目前有三个消费者node:注意-->一个consumer中一个消费线程能够消费一个或多个partition.

若是每一个consumer建立一个consumer thread线程,各个node消费状况以下,node1消费索引编号为0,1分区,node2费索引编号为2,node3费索引编号为3

若是每一个consumer建立2个consumer thread线程,各个node消费状况以下(是从consumer node前后启动状态来肯定的),node1消费索引编号为0,1分区;node2费索引编号为2,3;node3为空闲状态

总结:

从以上可知,Consumer Group中各个consumer是根据前后启动的顺序有序消费一个topic的全部partitions的。

若是Consumer Group中全部consumer的总线程数大于partitions数量,则可能consumer thread或consumer会出现空闲状态。

Consumer均衡算法

当一个group中,有consumer加入或者离开时,会触发partitions均衡.均衡的最终目的,是提高topic的并发消费能力.

1) 假如topic1,具备以下partitions: P0,P1,P2,P3

2) 加入group中,有以下consumer: C0,C1

3) 首先根据partition索引号对partitions排序: P0,P1,P2,P3

4) 根据(consumer.id + '-'+ thread序号)排序: C0,C1

5) 计算倍数: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整)

6) 而后依次分配partitions: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i * M),P((i + 1) * M -1)]

3.参考资料

1.http://blog.jobbole.com/75328/ 分布式消息系统:Kafka

2.http://blog.csdn.net/lizhitao/article/details/23744675

3.http://blog.csdn.net/opensure/article/details/46048589Kafka文件存储机制那些事

4.http://blog.csdn.net/liuao107329/article/details/70175691 Zookeeper在kafka中的应用

5.https://wenku.baidu.com/view/20781790b52acfc788ebc955.html

相关文章
相关标签/搜索