Kafka学习笔记

Apache Kafka

1、消息队列分类

1.1 点对点

消息生产者生产消息发送到queue中,而后消息消费者从queue中取出并消费消息算法

 

注意:数据库

  1.消息被消费之后,queue中再也不有存储,因此消息消费者不可能消费到已经被消费的消息数组

  2.Queue支持存在多个消费者,可是对一个消息而言,只会有一个消费者能够消费缓存



1.2 发布/订阅

消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不一样,发布到topic的消息会被全部订阅者消费服务器




2、消息队列对比

2.1 RabbitMQ

支持的协议多,很是重量级消息队列,对路由(Routing),负载均衡(Load balance)或者数据持久化都有很好的支持网络




2.2 ZeroMQ

号称最快的消息队列系统,尤为针对大吞吐量的需求场景,擅长高级/复杂的队列,可是技术也复杂,而且只提供非持久性队列数据结构

 

2.3 ActiveMQ

Apache下的一个子项,相似ZeroMQ,可以以代理人和点对点的技术实现队列架构

 

 

2.4 Redis

一个key-value的NoSQL数据库,但也支持MQ功能,数据量较小,性能优于RabbitMQ,数据超过10K就慢的没法忍受并发

 

 


 

 

3、Kafka简介

3.1 Kafka简介

Kafka是分布式发布-订阅消息系统。它最初由Linkedln公司开发,使用Scala语言编写,以后成为Apache项目的一部分。Kafka是一个分布式的,可划分的(分区处理),多订阅者,冗余备份的持久性的日志服务。它主要用于处理活跃的流式数据app

 

 

3.2 Kafka特色

1.同时为发布和订阅提供高吞吐量。据了解,Kafka每秒能够生产约25万消息(50M),每秒处理55万消息(110M)

 

2.可进行持久化操做。将消息持久化到磁盘,所以可用于批量消费,例如ETL,以及实时应用程序。经过将数据持久化到硬盘以及replication防止数据丢失

 

3.分布式系统,易于向外扩展。全部的producer、broker和consumer都会有多个,均为分布式。无需停机便可扩展机器

 

4.消息被处理的状态是在consumer端维护,而不是由server端维护

 

5.支持online(上线)和offline(下线)的场景

 

 

4、Kafka架构

wKioL1lA6BKB6X54AADLaYZVXhY844.png

 

 

重要说明:

1.在Kafka的体系中不存在单读的Conmuser,它会存在一个Conmuser Group,Conmuser Group里面会有多个Conmuser

 

2.能够把Consumer Group当作一个虚拟的Consumer,它消费的是一个具体的Topic的数据,但具体执行是由Consumer Group中的Consumer去执行的,Consumer是一个逻辑上的概念,是不存在的,而存在的是Consumer Group当中的Consumer, 一个Consumer Group对应的是Topic,Consumer Group中的Consumer对应的是Topic中的partition

 

3.一个消费者组里面的多个消费者对应的是什么呢?

Topic组里面不一样Partition的数据,一个Partition里面的数据交给一个Consumer来处理,另外一个Partition里面的数据交给另外一个Consumer来处理,固然它们必须是同一个Consumer Group里面的Consumer,这就达到了并行的消费(每个Consumer对应的是一个Partition里面的数据)

 

4.Kafka为何会有Partition的概念?

带来的好处就是处理的速度更快,不一样的Conmuser去消费不一样Partition的消息,数据的消费就变成了并行的

 

 

 

 

5、Kafka的核心概念

5.1 Producer

特指消息的生产者

 

5.2 Consumer

特指消息的消费者

 

 

5.3 Consumer Group

消费者组,能够并行消费Topic中Partition的消息

 

 

5.4 Broker

缓存代理,Kafka集群中的一台或多台服务器统称为broker

 

1.message在broker中经过log追加的方式进行持久化存储。并进行分区(patitions)

 

2.为了减小磁盘写入的次数,broker会将消息展现buffer起来,当消息的个数(或尺寸)达到必定阀值时,再flush到磁盘,这样减小了磁盘IO调用的次数

 

3.Broker没有副本机制,一旦broker宕机,该broker的消息将不可用(可是消息是有副本的,能够把消息的副本同步到其它的broker中)

 

4.Broker不保存订阅者的状态,由订阅者本身保存

 

5.无状态致使消息的删除成为难题(可能删除的消息正在被订阅)Kafka采用基于时间的SLA(服务水平保证),消息保存必定的时间(一般为7天)后会被删除

 

6.消息订阅者能够rewind back到任意位置从新进行消费,当订阅者故障时,能够选择最小的offset(id)进行从新读取消费消息

 

 

5.5 Topic

特指Kafka处理的消息源(feeds of messages)的不一样分类

 

 

5.6 Partition

1.Topic物理上的分组,一个topic能够分为多个partition,每一个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)

 

2.Kafka的Partitions分区的目的

2.1 kafka基于文件存储,经过分区,能够将日志内容分线到多个server上,来避免文件尺寸达到单击磁盘的上线,每一个partition都会被当前server(kafka实例)保存

 

2.2 能够将一个topic切分任意多个partitions来提升消息保存/消费的效率

 

2.3 越多的partitions意味着能够容纳更多的consumer,有效提高并发消费的能力

 

 

5.7 Message

1.消息,是通讯的基本单位,每一个producer能够向一个topic(主题)发布一些消息

 

2.Kafka中的Message是以topic为基本单位组织的,不一样的topic之间是相互独立的。每一个topic有能够分红几个不一样的partition(每一个topic有几个partition是在建立topic时指定的)每一个partition存储一部分Message

 

3.partition中的每条Message包含了一下三个属性

        属性名称           数据类型

        offset                long

        MessageSize          int32

        data                  mssage的具体内容

 

5.8 Producers

消息和数据生产者,向kafka的一个topic发布消息的过程叫作producers

 

1.producer将消息发布到指定的topic中,同时producer也能决定将此消息归属于哪一个partition。好比基于“round-robin”方式或者经过其余的一些算法等

 

2.异步发送:批量发送能够颇有效的提升发送效率。kafka producer的异步发送模式容许进行批量发送,先将消息缓存在内存中,而后一次请求批量发送出去

 

 

5.9 Consumers

1.消息和数据消费者,订阅topics并处理其发布的消息的过程叫作consumers

 

2.在Kafka中,咱们能够认为一个group是一个“订阅者”,一个Topic中的每一个partition只会被一个“订阅者”中的一个consumer消费,不过一个consumer能够消费多个partition中的消息(消费者数据小于Partition的数量时)

 

3.注意:kafka的设计原理决定,对于一个topic,同一个group中不能有多于partitions个数的consumer同时消费,不然将意味着某些consumer将没法获得消息

 

 

 

6、Kafka的持久化

6.1 数据持久化

1.发现线性的访问磁盘,不少时候比随机的内存访问快的多

 

2.传统的使用内存做为磁盘的缓存

 

3.Kafka直接将数据写入到日志文件

 

 

6.2 日志数据持久化特性

1.写操做:经过将数据追加到文件中实现

 

2.读操做:读的时候从文件读就行了

 

 

6.3 优点

1.读操做不会阻塞写稻做和其它操做,数据大小不对性能产生影响

 

2.没有容量限制(相对于内存来讲)的硬盘空间创建消息系统

 

3.线性访问磁盘,速度快,能够保存任意一段时间



6.4 持久化具体实现

1.一个Tipic能够认为是一个类消息,每一个topic将被分红多个partition,每一个partition在存储层面是append log文件。任何发布到此partition的消息都会被直接追加到log文件的尾部,每条消息在文件中的位置称为offset(偏移量),partition是以文件的形式存储在文件系统中

 

2.Logs文件根据broker中的配置要求,保留必定时间后删除来释放磁盘空间(默认是7天)

wKiom1lA6XCzznN5AABK8vKtt4A602.png

 

 

说明:Partition是Topic物理上的分组,一个topic能够分为多个partition,每一个partition是一个有序的队列。Partition中每条消息都会被分配一个有序的id(offset)

 

 

 

6.5 索引

为数据文件创建索引:稀疏存储,每隔必定字节的数据创建一条索引。下图是一个partition的索引示意图

wKiom1lA6a-AugZnAALw2snSFxY009.png

 

 

注意:

1. 如今对一、三、六、8 创建了索引,若是要查找7,则会先查找到8而后,再找到8后的一个索引6,而后两个索引之间作二分法,找到7的位置

 

2. 日志文件也会进行segement(分割),分而治之

 

 

 

 

7、Kafka的分布式实现

7.1 Kafka分布式架构图

wKiom1lA6j_Bz_BcAAGYJMl0ur0373.png

 

注意:

1.当生产者将消息发送到Kafka后,就会去马上通知ZooKeeper,zookeeper中会watch到相关的动做,当watch到相关的数据变化后,会通知消费者去消费消息

 

2.消费者是主动去Pull(拉)kafka中的消息,这样能够下降Broker的压力,由于Broker中的消息是无状态的,Broker也不知道哪一个消息是能够消费的

 

3.当消费者消费了一条消息后,也必需要去通知ZooKeeper。zookeeper会记录下消费的数据,这样但系统出现问题后就能够还原,能够知道哪些消息已经被消费了

 

 

7.2生产环境部署架构图

wKioL1lA6nbTXE8pAAEgBBaexcU809.png

 

 

说明:

1.Name Server集群指的是Zookeeper集群

 

 

 

 

7、Kafka的通信协议

8.1 通信协议简介

1.Kafka的通信协议主要说的是,consumer去拉数据使用的通信协议

 

2.Kafka的Producer、Broker和Consumer采用的是一套自行设计基于TCP层的协议,根据业务需求定制,而非实现一套相似于Protocol Buffer的通信协议

 

3.基本数据类型

3.1定长数据类型:int8,int16,int32和int64,对应到Java中就是byte,short,int和long

3.2变长数据类型:bytes和string。变长的数据类型由两部分组成,分别是一个有符号整数N(标识内容的长度)和N个字节的内容。其中N为-1标识内容为null。Bytes的长度由int32标识,string的长度由int16表示

3.3数组:数组由两个部分组成,分别是一个有int32类型的数字标识的数组长度N和N个元素

 

 

 

8.2通信协议详细说明

1.Kafka通信的基本单位是Request/Response

 

2.基本结构:

  RequestOrResponse ---> MessageSize(RequestMessage | ResponseMessage)

名称

类型

描述

ApiKey

Int16

标识此次请求的API编号

ApiVersion

Int16

标识请求的API版本,有了版本后就能够作到向后兼容

CorrelationId

Int32

由客户端指定的一个数字惟一标识此次请求的id,服务器端在处理请求后也会把一样的CorrelationId写到Response中,这样客户端就能把某个请求和响应对应起来了

ClientId

string

客户端指定的用来描述客户端的字符串,会被用来记录日志和监控,它惟一标识一个客户端

Request

-

Request的具体内容

 

3.通信过程:

3.1客户端打开与服务端的Socket

3.2往Socket写入一个int32的数字(数字标识此次发送的Request有多少字节)

3.3服务器端先读出一个int32的整数从而获取此次Request的大小

3.4而后读取对应字节数的数据从而获得Request的具体内容

3.5服务器端处理了请求以后也用一样的发送发誓来发送响应

 

4.RequestMessage结构

4.1RequestMessage ---> ApiKey ApiVersion CorrelationId ClientId Request

名称

类型

描述

MessageSize

int32

标识RequestMessage或者ResponseMessage的长度

RequestMessage

ResponseMessage

--

标识Request或者Response的内容

 

5.ResponseMessage

5.1ResponseMessage --->  CorrelationId Response

名称

类型

描述

CorrelationId

int32

对应Request的CorrelationId

Response

--

对应Request的Response,不一样的Request的Response的字段是不同的

 

Kafka采用是经典的Reactor(同步IO)模式,也就是1个Acceptor响应客户端的链接请求,N个Processor来读取数据,这种模式能够构建出高性能的服务器

 

6.Message:Producer生产的消息,键-值对

6.1Message --- > Crc MagicByte Attributes Key Value

名称

类型

描述

CRC

Int32

标识这条消息(不包括CRC字段自己)的校验码

MagicByte

Int8

标识消息格式的版本,用来作向后兼容,目前值为0

Attributes

Int8

标识这条消息的元数据,目前最低两位用来标识压缩格式

Key

bytes

标识这条消息的Key,能够为null

Value

bytes

标识这条消息的Value。Kafka支持消息嵌套,也就是把一条消息做为Value放到另一个消息里面

 

说明:

CRC是一种消息检验方式,在Consumer拿到数据之后,CRC会获取MessageSize和MessageData的大小作比较,若是不一致则,那么这个操做的数据Consumer就不接收了,若是一直则才作处理。防止消息在传输过程当中损坏,丢失的一种校验方式

 

7.MessageSet:用来组合多条Message,它在每条Message的基础上加上offset和MessageSize

7.1MessageSet --> [offset MessageSize Message]

名称

类型

描述

Offset

Int64

它用来做为log中的序列号,Producer在生产消息的时候还不知道具体的值是什么,能够随便填个数字进去

MessageSize

Int32

标识这条Message的大小

Message

-

标识这条Message的具体内容,其格式见上一小结

 

8.Request/Response和Message/messageSet的关系

8.1 Request/Response是通信层的结构,和网络的7层模型对比的话,它相似于TCP

8.2 Message/MessageSet定义的是业务层的结构,相似于网络7层模型中的HTTP层。Message/MessageSet只是Request/Response的payload中的一种数据结构

备注:Kafka的通信协议中不包含Schema,格式也比较简单,这样设计的好处是协议自身的Overhead小,再加上把多条Message放到一期作压缩,提升压缩比率,从而在网络上传输的数据量会少一些

 

 

 

9、数据传输的事务定义


1.at most once:最多一次,这个和JMS中“非持久化”消息相似,发送一次,不管成败,将不会重发

消费者fetch(获得)消息,而后保存offset,而后处理消息;  当client保存offset以后,可是在消息处理过程当中出现了异常,致使部分消息未能继续处理,那么伺候“未处理”的消息将不能被fetch到,这就是“at most once”

 

2.at least once:消息至少发送一次,若是消息未能接收成功,可能会重发,知道接收成功

消费者fetch消息,而后处理消息,而后保存offset,若是消息处理成功以后,可是在保存offset阶段zookeeper异常致使保存操做未能执行成功,这就致使接下来再次fetch时可能得到上次已经处理过的消息,这就是“at least once”,缘由offset没有即便的提交给zookeeper,zookeeper恢复正常仍是以前offset状态。

注:一般状况下“at least once”是咱们的首选(相比at most once而言,重复接收数据总比丢失数据要好)

 

3.exactly once:消息只会发送一次

Kafka中并无严格的去实现(基于2阶段提交,事务),咱们认为这种策略在kafka中是没有必要的




10、Kafka安装

1.下载并上传kafka到服务器

 

2.解压缩并移动到/usr/local目录下

 

3.启动服务

3.1启动zookeeper服务 # ./zookeeper-server-start.sh ../config/zookeeper.properties > /dev/null 2>&1 &

 

3.2启动kafka服务 # ./kafka-server-start.sh ../config/server.properties > /dev/null 2>&1 &

 

3.3建立topic:

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

 

3.4查看主题

./kafka-topics.sh --list --zookeeper localhost:2181

 

3.5查看主题详情

./kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

 

3.6删除主题

./kafka-run-class.sh kafka.admin.TopicCommand --delete --topic test --zookeeper 192.168.31.220:2181

 

 

11、Kafka客户端操做

11.1建立Producer

./kafka-console-producer.sh --broker-list localhost:9092 --topic test1

 

11.2建立Consumer

./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test1 --from-beginning

 

 

11.3参数使用帮组信息查看

生产者参数查看:./kafka-console-producer.sh

消费者参数查看:./kafka-console-consumer.sh

相关文章
相关标签/搜索