Kafka系列之1—Kafka的整体认识

Kafka的整体认识

1.非中心的架构模型安全

2.基于TCP的一套Kafka通讯协议session

3.消息中间件&存储系统架构

4.存储逻辑层的高并发保证并发

5.isr机制下降了保证分布式一致性的代价socket

1. 非中心的架构模型

咱们知道,在分布式系统的架构类型里,既有主从式的架构,也有非中心式的架构,像hadoophbase都采用了主从式的架构模型,主从式的架构优势有不少,可是主从式下为了不单点故障而采起的各类策略使得主从式架构的优势并不那么理想,kafka做为一个分布式的消息系统,它采用了非中心式的架构模型,每一个节点都做为独立的ServerClient提供服务,在集群环境下,多个节点依赖zookeeper维护client在读写访问中的分布式一致性。tcp

在早期0.8.2以前的kafka版本中,kafkazookeeper的依赖很是大,producerserverconsumer都很是依赖zookeeper,虽然zookeeper做为一个轻量级的文件系统(已经成为分布式服务的基础服务,用以提供分布式环境下的一致性),可是大量的与其交互仍然会致使一些性能问题和不稳定的方面,在0.8.2以后的改进中,经过将一些状态保持在kafka自身中,减小与zookeeper的大量交互,为读写提供了更稳定的实现。分布式

2. 基于TCP的一套Kafka通讯协议

2.1 概述

kafka的通讯协议至关的简单,只有六类核心的客户端请求APIside

  • Metadata:描述当前可用的brokershostport信息,并给出每一个broker上分配了哪些partitions高并发

  • Send:发送messagesbroker工具

  • Fetch:从broker中获取messages,包括获取data、获取集群的元数据信息以及获取某个topicoffset信息;

  • Offsets:获取某个给定topic partition的可用offsets信息;

  • Offset Commit:提交consumer groupoffsets信息;

  • Offset Fetch:获取某个consumer groupoffsets信息集合。

 

这些都会在下面详细描述。另外,0.9版本的kafkaconsumerskafka connect支持通常的group management。这部分的Client Api包括五种requests

  • GroupCoordinator:定位当前consumer groupcoordinator

  • JoinGroup:加入一个consumer group,若是没有就建立一个;

  • SyncGroup:同步同一个group下的全部consumer状态(partition分配到consumer的分布状况);

  • Heartbeat:用来检测group中的成员的存活状态;

  • LeaveGroup:直接离开一个group

 

还有一些用于监控/管理 kafka集群的administrativeAPIs

  • DescribeGroups:用来检测当前的groups

  • ListGroups:列出broker中管理的groups

Request格式以下:

RequestMessage => ApiKey ApiVersionCorrelationId ClientId RequestMessage

 ApiKey => int16

 ApiVersion => int16

 CorrelationId => int32

 ClientId => string

 RequestMessage => MetadataRequest | ProduceRequest | FetchRequest |OffsetRequest | OffsetCommitRequest | OffsetFetchRequest

Response格式以下:

Response => CorrelationIdResponseMessage

CorrelationId => int32

ResponseMessage => MetadataResponse |ProduceResponse | FetchResponse | OffsetResponse | OffsetCommitResponse |OffsetFetchResponse

 

ApiKey

这个int数值是用来代表是哪种请求,KafkaApis根据这个值来调用相应的处理逻辑

ApiVersion

因为不一样的Kafka版本支持的ApiVersion不一样,所以要根据KafkaServer支持的ApiVersion来发送对应格式的Request

CorrelationId

客户端提供的一个整型值,在response中会原封不动的返回,它的做用主要是用来匹配clientserver之间的requestresponse

2.3 ApiKey

下面的列表是ApiKey的整型值对应的Request类:

API name

ApiKey Value

ProduceRequest

0

FetchRequest

1

OffsetRequest

2

MetadataRequest

3

Non-user facing control APIs

4-7

OffsetCommitRequest

8

OffsetFetchRequest

9

GroupCoordinatorRequest

10

JoinGroupRequest

11

HeartbeatRequest

12

LeaveGroupRequest

13

SyncGroupRequest

14

DescribeGroupsRequest

15

ListGroupsRequest

16

2.4Response中的Error Codes

Error

Code

重试

描述

NoError

0


没有错误,执行成功!

Unknown

-1


未知的server error

OffsetOutRange

1


请求的offset值超出了server端维护的对应topic/partitionoffset值(能够大于也能够小于)

InvalidMessage/CorruptMessage

2

YES

消息内容不能经过CRC校验

UnknownTopicOrPartition

3

YES

请求的topicpartition再也不发往的broker

InvalidMessageSize

4


消息的大小为负值

LeaderNotAvailable

5

YES

请求发生在leader选举过程当中时抛出这个异常,此时请求的partition没有leader没法读写

NotLeaderForPartition

6

YES

在客户端尝试向不是leaderreplica写入信息时抛出,意味着客户端的元数据信息过时了

RequestTimedOut

7

YES

request超过了用户指定的时间,通常是值socket超时

BrokerNotAvailable

8


这个错误不是client遇到的,每每发生在工具类的请求中

ReplicaNotAvailable

9


broker上没有指望的replica(能够被安全的忽视)

MessageSizeTooLarge

10


server有一个最大消息的配置,当clientserver端写入超过配置大小的message时抛出

StaleControllerEpochCode

11


brokerbroker通讯时发生的内部错误

OffsetMetadataTooLargeCode

12


若是指定了一个大于配置的offset metadata大小的string

GroupLoadInProgressCode

14

YES

topic partitionleader发生变化后,新的leaderload offsets时,offset fetch request请求时抛出,或者在group  membership(例如heartbeat)的response中返回当coordinatorload group  metadata

GroupCoordinatorNotAvailableCode

15

YES

offsets topic还没建立或者group coordinator没有active

NotCoordinatorForGroupCode

16

YES

offset fetchcommit request的请求发往一个不是coordinator的节点

InvalidTopicCode

17


访问一个不可用的topic或者尝试对内部topic__consumer_offset)进行写入操做时

RecordListTooLargeCode

18


若是producemessage batch超过了配置的segment size

NotEnoughReplicasCode

19

YES

处于in-syncreplicas数量小于配置的produce要求的最小replicasrequiredAcks=-1

NotEnoughReplicasAfterAppendCode

20

YES

message被写入到log后,可是in-syncreplicas数小于须要的

InvalidRequiredAcksCode

21


请求的requiredAcks是不可得到的

IllegalGenerationCode

22


server端的generation idrequest中的generation id不一致

InconsistentGroupProtocolCode

23


当前group可以接受的protocol type中不包含join group时给出的protocol type

InvalidGroupIdCode

24


join groupgroupId为空或者null

UnknownMemberIdCode

25


当前generationgroup中不包含请求的memberId

InvalidSessionTimeoutCode

26


join group时超出了配置的request  session timeout

RebalanceInProgressCode

27


当请求发起时coodinator正在对group进行rebalance,此时client要从新join group

InvalidCommitOffsetSizeCode

28


offset commit超过metadata的最大限制被拒绝时

TopicAuthorizationFailedCode

29


client没有访问请求的topic的权限时

GroupAuthorizationFailedCode

30



ClusterAuthorizationFailedCode

31



 

kafka实现了基于tcp的一种通讯协议,只要符合通讯协议的规范,便可与kafka server进行通讯,于是kafka client是跨语言的

3. 消息中间件&存储系统

kafka既能够被认为是消息中间件,也能够做为存储系统使用

因为kafka能够将producer发送的消息保存起来供consumer消费,所以既能够做为消息中间件使用,也能够做为存储系统来保存数据。

4. 存储逻辑层的高并发保证

kafka在存储逻辑层的设计为高吞吐量提供了可能

kafka存储数据的逻辑单元是partitionproducerconsumer的处理单元也是基于partition的,针对某个topic,能够有多个partition,而多个partition又能够分布在不一样的节点上,从而在存储层保证了I/O的并发,为高吞吐量提供了可能。

5. isr机制下降了保证分布式一致性的代价

kafkaisr同步机制使得保证分布式一致性的代价大大下降

kafkaisr机制,容许isr中的replica和主副本以前有必定的差距,这样作保证了响应的及时性,另外一方面,因为在isr层面没有考虑严格的分布式一致性,没有使用paxosleader选举策略,使得kafkaleader选举更加容易,没有严格的节点数要求的限制,只要有一个节点是isr中的,就不会丢数据。