1.非中心的架构模型安全
2.基于TCP的一套Kafka通讯协议session
3.消息中间件&存储系统架构
4.存储逻辑层的高并发保证并发
5.isr机制下降了保证分布式一致性的代价socket
咱们知道,在分布式系统的架构类型里,既有主从式的架构,也有非中心式的架构,像hadoop和hbase都采用了主从式的架构模型,主从式的架构优势有不少,可是主从式下为了不单点故障而采起的各类策略使得主从式架构的优势并不那么理想,kafka做为一个分布式的消息系统,它采用了非中心式的架构模型,每一个节点都做为独立的Server向Client提供服务,在集群环境下,多个节点依赖zookeeper维护client在读写访问中的分布式一致性。tcp
在早期0.8.2以前的kafka版本中,kafka对zookeeper的依赖很是大,producer、server、consumer都很是依赖zookeeper,虽然zookeeper做为一个轻量级的文件系统(已经成为分布式服务的基础服务,用以提供分布式环境下的一致性),可是大量的与其交互仍然会致使一些性能问题和不稳定的方面,在0.8.2以后的改进中,经过将一些状态保持在kafka自身中,减小与zookeeper的大量交互,为读写提供了更稳定的实现。分布式
kafka的通讯协议至关的简单,只有六类核心的客户端请求APIs。ide
Metadata:描述当前可用的brokers的host和port信息,并给出每一个broker上分配了哪些partitions;高并发
Send:发送messages到broker;工具
Fetch:从broker中获取messages,包括获取data、获取集群的元数据信息以及获取某个topic的offset信息;
Offsets:获取某个给定topic partition的可用offsets信息;
Offset Commit:提交consumer group的offsets信息;
Offset Fetch:获取某个consumer group的offsets信息集合。
这些都会在下面详细描述。另外,0.9版本的kafka对consumers和kafka connect支持通常的group management。这部分的Client Api包括五种requests:
GroupCoordinator:定位当前consumer group的coordinator;
JoinGroup:加入一个consumer group,若是没有就建立一个;
SyncGroup:同步同一个group下的全部consumer状态(partition分配到consumer的分布状况);
Heartbeat:用来检测group中的成员的存活状态;
LeaveGroup:直接离开一个group。
还有一些用于监控/管理 kafka集群的administrativeAPIs
DescribeGroups:用来检测当前的groups;
ListGroups:列出broker中管理的groups。
RequestMessage => ApiKey ApiVersionCorrelationId ClientId RequestMessage
ApiKey => int16
ApiVersion => int16
CorrelationId => int32
ClientId => string
RequestMessage => MetadataRequest | ProduceRequest | FetchRequest |OffsetRequest | OffsetCommitRequest | OffsetFetchRequest
Response => CorrelationIdResponseMessage
CorrelationId => int32
ResponseMessage => MetadataResponse |ProduceResponse | FetchResponse | OffsetResponse | OffsetCommitResponse |OffsetFetchResponse
ApiKey |
这个int数值是用来代表是哪种请求,KafkaApis根据这个值来调用相应的处理逻辑 |
ApiVersion |
因为不一样的Kafka版本支持的ApiVersion不一样,所以要根据KafkaServer支持的ApiVersion来发送对应格式的Request |
CorrelationId |
客户端提供的一个整型值,在response中会原封不动的返回,它的做用主要是用来匹配client和server之间的request和response。 |
下面的列表是ApiKey的整型值对应的Request类:
API name |
ApiKey Value |
ProduceRequest |
0 |
FetchRequest |
1 |
OffsetRequest |
2 |
MetadataRequest |
3 |
Non-user facing control APIs |
4-7 |
OffsetCommitRequest |
8 |
OffsetFetchRequest |
9 |
GroupCoordinatorRequest |
10 |
JoinGroupRequest |
11 |
HeartbeatRequest |
12 |
LeaveGroupRequest |
13 |
SyncGroupRequest |
14 |
DescribeGroupsRequest |
15 |
ListGroupsRequest |
16 |
Error |
Code |
重试 |
描述 |
NoError |
0 |
没有错误,执行成功! |
|
Unknown |
-1 |
未知的server error |
|
OffsetOutRange |
1 |
请求的offset值超出了server端维护的对应topic/partition的offset值(能够大于也能够小于) |
|
InvalidMessage/CorruptMessage |
2 |
YES |
消息内容不能经过CRC校验 |
UnknownTopicOrPartition |
3 |
YES |
请求的topic或partition再也不发往的broker上 |
InvalidMessageSize |
4 |
消息的大小为负值 |
|
LeaderNotAvailable |
5 |
YES |
请求发生在leader选举过程当中时抛出这个异常,此时请求的partition没有leader没法读写 |
NotLeaderForPartition |
6 |
YES |
在客户端尝试向不是leader的replica写入信息时抛出,意味着客户端的元数据信息过时了 |
RequestTimedOut |
7 |
YES |
request超过了用户指定的时间,通常是值socket超时 |
BrokerNotAvailable |
8 |
这个错误不是client遇到的,每每发生在工具类的请求中 |
|
ReplicaNotAvailable |
9 |
broker上没有指望的replica(能够被安全的忽视) |
|
MessageSizeTooLarge |
10 |
server有一个最大消息的配置,当client向server端写入超过配置大小的message时抛出 |
|
StaleControllerEpochCode |
11 |
在broker和broker通讯时发生的内部错误 |
|
OffsetMetadataTooLargeCode |
12 |
若是指定了一个大于配置的offset metadata大小的string |
|
GroupLoadInProgressCode |
14 |
YES |
当topic partition的leader发生变化后,新的leader在load offsets时,offset fetch request请求时抛出,或者在group membership(例如heartbeat)的response中返回当coordinator在load group metadata时 |
GroupCoordinatorNotAvailableCode |
15 |
YES |
offsets topic还没建立或者group coordinator没有active |
NotCoordinatorForGroupCode |
16 |
YES |
offset fetch或commit request的请求发往一个不是coordinator的节点 |
InvalidTopicCode |
17 |
访问一个不可用的topic或者尝试对内部topic(__consumer_offset)进行写入操做时 |
|
RecordListTooLargeCode |
18 |
若是produce的message batch超过了配置的segment size |
|
NotEnoughReplicasCode |
19 |
YES |
处于in-sync的replicas数量小于配置的produce要求的最小replicas和requiredAcks=-1 |
NotEnoughReplicasAfterAppendCode |
20 |
YES |
当message被写入到log后,可是in-sync的replicas数小于须要的 |
InvalidRequiredAcksCode |
21 |
请求的requiredAcks是不可得到的 |
|
IllegalGenerationCode |
22 |
server端的generation id和request中的generation id不一致 |
|
InconsistentGroupProtocolCode |
23 |
当前group可以接受的protocol type中不包含join group时给出的protocol type |
|
InvalidGroupIdCode |
24 |
当join group时groupId为空或者null |
|
UnknownMemberIdCode |
25 |
当前generation里group中不包含请求的memberId |
|
InvalidSessionTimeoutCode |
26 |
join group时超出了配置的request session timeout |
|
RebalanceInProgressCode |
27 |
当请求发起时coodinator正在对group进行rebalance,此时client要从新join group |
|
InvalidCommitOffsetSizeCode |
28 |
当offset commit超过metadata的最大限制被拒绝时 |
|
TopicAuthorizationFailedCode |
29 |
client没有访问请求的topic的权限时 |
|
GroupAuthorizationFailedCode |
30 |
||
ClusterAuthorizationFailedCode |
31 |
kafka实现了基于tcp的一种通讯协议,只要符合通讯协议的规范,便可与kafka server进行通讯,于是kafka client是跨语言的
kafka既能够被认为是消息中间件,也能够做为存储系统使用
因为kafka能够将producer发送的消息保存起来供consumer消费,所以既能够做为消息中间件使用,也能够做为存储系统来保存数据。
kafka在存储逻辑层的设计为高吞吐量提供了可能
kafka存储数据的逻辑单元是partition,producer和consumer的处理单元也是基于partition的,针对某个topic,能够有多个partition,而多个partition又能够分布在不一样的节点上,从而在存储层保证了I/O的并发,为高吞吐量提供了可能。
kafka的isr同步机制使得保证分布式一致性的代价大大下降
kafka的isr机制,容许isr中的replica和主副本以前有必定的差距,这样作保证了响应的及时性,另外一方面,因为在isr层面没有考虑严格的分布式一致性,没有使用paxos的leader选举策略,使得kafka的leader选举更加容易,没有严格的节点数要求的限制,只要有一个节点是isr中的,就不会丢数据。