kafka通讯协议(一)

简介

Kafka使用基于TCP的二进制协议,该协议定义了全部API的请求及响应消息。全部的消息都是经过长度来分隔,而且由后面描述的基本类型组成。html

基本数据类型

  • 定长基本类型
    int8/int16/int32/int64这些都是不一样精度的带符号整数,以大端(big endian)方式存储。
  • 变长基本类型
    这些类型由一个表示长度的带符号整数N以及后续N字节的内容组成,长度若是为-1则表示空。
    string使用int16表示长度,bytes使用int32表示长度。
  • 数组
    这个类型用来处理重复的结构体数据,它们老是由一个表明元素个数int32的整数N,以及后续N个重复结构体组成。
    这些结构体自身是由其余的基本数据类型组成。

通用的请求和响应格式

  • 全部请求和响应都遵循如下语法基础:
RequestOrResponse --> Size( RequestMessage | ResponseMessage )
    Size --> int32

Size给出了后续请求或响应消息的字节长度。
解析时应先读取4字节的长度N,而后读取并解析后续的N字节请求/响应内容。node

  • 全部请求都具备如下格式:
RequestMessage --> request_header request
    request_header --> api_key api_version correlation_id client_id
        api_key --> int16
        api_version --> int16
        correlation_id -->int32
        client_id --> string
    request --> MetadataRequest | ProduceRequest | FetchRequest | ...

api_key: 这是一个表示所调用的API的数字ID(即不一样类型的请求有不一样的ID)
api_version: 这是该API的一个数字版本号,Kafka为每一个API定义一个版本号,该版本号容许服务器根据版本号正确地解释请求内容。
correlation_id: 这是一个用户提供的整数。它将被服务器原封不动的传回给客户端。用于匹配客户端和服务器之间的请求和响应。
client_id: 这是客户端应用程序自定义的标识
request: 具体的请求内容,例如元数据请求、生产请求、获取消息请求、偏移量请求等。apache

  • 全部响应都具备如下格式:
ResponseMessage --> response_header response
    response_header --> correlation_id
        correlation_id --> int32
    response --> MetadataResponse | ProduceResponse | ...

correlation_id: 即请求中携带的correlation_id
response: 具体的响应内容api

  • 消息集(MessageSet)格式
MessageSet --> [Offset MessageSize Message]
    Offset --> int64
    MessageSize --> int32
    Message --> Crc MagicByte Attributes Key Value
        Crc --> int32
        MagicByte --> int8
        Attributes --> int8
        Key --> bytes
        Value --> bytes

Offset: 这是在Kafka中做为日志序列号使用的偏移量。
Crc:Crc是的剩余消息字节的CRC32值。broker和消费者可用来检查信息的完整性。
MagicByte: 这是一个用于容许消息二进制格式的向后兼容演化的版本id。当前值是0。 Attributes: 这个字节保存有关信息的元数据属性。
最低的3位包含用于消息的压缩编解码器。
第四位表示时间戳类型,0表明CreateTime,1表明LogAppendTime。生产者必须把这个位设成0。
全部其余位必须被设置为0。
Timestamp: 消息的时间戳。时间戳类型在Attributes域中体现。单位为从UTC标准准时间1970年1月1日0点到所在时间的毫秒数。
Key: Key是一个可选项,它主要用来进行指派分区。Key能够为null。
Value: Value是消息的实际内容,类型是字节数组。
注:此格式为V0版本的格式,V1详见官方文档,一样后续具体请求格式均以V0版本进行说明数组

部分请求响应的具体格式

  • SASL握手请求响应(api_key: 17)
SaslHandshake Request --> mechanism
    mechanism --> string

mechanism: 客户端选择的SASL机制服务器

SaslHandshake Response --> error_code [enabled_mechanisms]
    error_code --> int16
    enabled_mechanisms --> string

error_code: 错误码
enabled_mechanisms: 服务端使能的SASL mechanisms列表ide

  • 元数据请求响应(api_key: 3)
Metadata Request --> [topics]
    topics --> string

topics: 要获取元数据的topic数组。 若是为空,则返回全部topic的元数据。性能

Metadata Response --> [brokers] [topic_metadata]
    brokers --> node_id host port
        node_id --> int32
        host --> string
        port --> int32
    topic_metadata --> error_code topic [partition_metadata] 
        error_code --> int16
        topic --> string
        partition_metadata --> error_code partition leader [replicas] [isr]
            error_code --> int16
            partition --> int32
            leader --> int32
            replicas --> int32
            isr --> int32

node_id: broker的id, 即kafka节点的ID
host: broker的主机名
port: broker侦听端口
partition: topic对应partition的id
leader: 该topic全部partition中,扮演leader角色的partition所在的broker的id
replicas: 全部partition中,做为slave的节点集合
isr: 副本集合中,与leader处于跟随状态的节点集合fetch

  • 生产请求响应(api_key: 0)
ProduceRequest --> asks timeout [topic [partition messageSetSize MessageSet]]
    acks --> int16
    timeout --> int32
    topic --> string
    partition --> int32
    messageSetSize --> int32

acks: 这个值表示服务端收到多少确认后才发送反馈消息给客户端,若是设置为0,即服务端不发送response;若是设置为1,即服务端将等到数据写入本地日志后发送response;若是设置为-1,服务端将阻塞,直到这个消息被全部的同步副本写入后再发送response
timeout: 这个值提供了以毫秒为单位的超时时间
topic:该数据将会发布到的topic名称
partition:该数据将会发布到的分区
messageSetSize: 后续消息集的长度,单位是字节
messageSet:前面描述的标准格式的消息集合ui

ProduceResponse --> [responses]
    responses --> topic [partition_responses]
        topic --> string
        partition_responses --> partition error_code base_offset
            partition --> int32
            error_code --> int16
            base_offset --> int64

topic: 此响应对应的topic名称
partition: topic对应的partition的id
base_offset: produce的消息在partition中的偏移

  • 获取消息请求响应(api_key: 1)
Fetch Request --> replica_id max_wait_time min_bytes [topics]
    replica_id --> int32
    max_wait_time --> int32
    min_bytes --> int32
    topics --> topic [partitions]
        partitions --> partition fetch_offset max_bytes
            partition --> int32
            fetch_offset --> int64
            max_bytes --> int32

replica_id: 发起这个请求的副本节点ID
max_wait_time: 若是没有足够的数据可发送时,最大阻塞等待时间,以毫秒为单位
min_bytes: 返回响应消息的最小字节数目,必须设置。若是客户端将此值设为0,服务器将会当即返回,但若是没有新的数据,服务端会返回一个空消息集。若是它被设置为1,则服务器将在至少一个分区收到一个字节的数据的状况下当即返回,或者等到超时时间达到。经过设置较高的值,结合超时设置,消费者能够在牺牲一点实时性能的状况下经过一次读取较大的字节的数据块从而提升的吞吐量
topic: topic的名称
partition: topic对应partition的id
fetch_offset: 获取数据的起始偏移量
max_bytes: 此分区返回消息集所能包含的最大字节数。

Fetch Response --> [responses]
    responses --> topic [partition_responses]
        topic --> string
        partition_responses --> partition_header messageSet
            partition_header --> partition error_code high_watermark
                parttion --> int32
                error_code --> int16
                high_watermark --> int64

high_watermark: 该partition分区中最后提交的消息的偏移量。此信息可被客户端用来肯定后面还有多少条消息。

  • 偏移量请求响应(api_key: 2)
ListOffsets Request --> replica_id [topics]
    replica_id --> int32
    topics --> topic [partitions]
        topic --> string
        partitions --> partition timestamp max_num_offsets
            partition --> int32
            timestamp --> int64
            max_num_offsets --> int32

timestamp: 用来请求必定时间(单位:毫秒)前的全部消息
-1表示获取最新的offset,即下一个produce的消息的offset
-2表示获取最先的有效偏移量

ListOffsets Response --> [responses]
    responses --> topic [partition_responses]
        topic --> string
        partition_responses -->partition error_code [offsets]
            partition --> int32
            error_code --> int16
            offsets --> int64

参考

官网
A Guide To The Kafka Protocol
Kafka通信协议指南

相关文章
相关标签/搜索